最近项目中有一个关于MQ消息消费和java线程池的应用场景,如下:
业务进程:实现基础业务功能,将租户业务拆分成设备配置,通过MQ与设备适配层进行消息通知。
MQ:使用主流MQ中间件实现。保存的消息为租户业务拆分后的设备配置信息。
设备适配层:消息消费者,消费消息(单线程消费),将消息中的设备配置信息下发到对应的设备上(多线程处理配置下发)。
基本的消息体如上图所示,业务进程根据租户业务拆分成一个(或多个)设备的多个配置项,每台设备的配置项之间有配置顺序要求:config1->config2->config3->config4。业务进程按照配置顺序将配置信息写入MQ中,此时MQ中的消息是有序的。
设备适配层作为消息消费者,读取MQ中的设备配置消息,使用线程池来进行设备配置处理,提高消息处理速度。此时,如何在多线程并行的情况下保证同一台设备的配置下发顺序,即在多线程并行处理时如何保证MQ中同类型消息(设备ip相同)的保序消费呢???
因为消息可以根据设备ip进行区分,这时我想到如果可以把包含同一个设备ip的消息放入到一个线程中处理就可以解决该问题了。- _ -
那么如何做呢???或者说如何设计Runnable的接口实现??(Thread执行的Task)
上面写了一大堆文字,作为码农的我真心感觉有点累,下面先上代码吧:
还是代码看着亲切啊!!上面的代码就是我模拟的设备适配层中消费消息的功能:使用ScheduledThreadPoolExecutor这个线程池来处理设备配置下发操作。以5s周期执行OperationTask任务。
OperationTask实现了Runnable接口,它的run方法执行的就是配置下发操作,使用while循环判断tasks中是否存在配置下发任务,使用tasks.pop()方法获取配置下发消息。注意,这里使用了LinkedList来保存设备配置任务。LinkedList有FIFO特性,可以保证配置任务的顺序。
为什么使用while+pop来处理任务,不使用for/Iterator+remove来处理任务呢??
消息消费使用的是线程池,由一个主线程将消息写入到对应的worker线程的队列中,worker线程又要从队列中取数据进行处理,此时使用for/Iterator+remove操作进行任务处理时会因为主线程写入数据到队列,导致List中length和index的变化,导致循环失败。
我这里使用Iterator进行循环,使用remove清除队列中的已完成消息,会导致线程池中线程挂掉。。。具体原因还没有找到,后面找到再更新到这里。
将设备ip进行hash+mod计算,分配task到对应的OperationTask的任务队列中,调用task的addTask方法。
ipList模拟设备配置消息,包含设备ip。通过调用dispatchThread方法模拟消息消费,每次消费ipList.length个消息,然后进行运算写入到各task的队列中。Schedule线程池执行task任务,周期性消费task的队列(LinkedList)中的消息。
结果:
通过结果看到,总共执行了28个task,与dispatchThread分配的task个数相同。
第一次通过简书记录自己的coding生活,有不到位的地方欢迎大家指正,同时有更好的解决方式希望大佬们不吝赐教,多谢!