0
点赞
收藏
分享

微信扫一扫

全站最硬核 百万字强肝RocketMq源码 火热更新中~(七十五)

消费者过滤管理定时:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.consumerFilterManager.persist();
        } catch (Throwable e) {
            log.error("schedule persist consumer filter error.", e);
        }
    }
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);

broker保护的定时任务:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.protectBroker();
        } catch (Throwable e) {
            log.error("protectBroker error.", e);
        }
    }
}, 3, 3, TimeUnit.MINUTES);

打印流的定时任务:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.printWaterMark();
        } catch (Throwable e) {
            log.error("printWaterMark error.", e);
        }
    }
}, 10, 1, TimeUnit.SECONDS);

printWaterMark( ) 打印的就是发送消息队列的大小

拉取消息队列的大小 查询消息队列的大小 事务队列的大小

public void printWaterMark() {
    LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
    LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());
    LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(), headSlowTimeMills4QueryThreadPoolQueue());
    LOG_WATER_MARK.info("[WATERMARK] Transaction Queue Size: {} SlowTimeMills: {}", this.endTransactionThreadPoolQueue.size(), headSlowTimeMills4EndTransactionThreadPoolQueue());
}
举报

相关推荐

0 条评论