0
点赞
收藏
分享

微信扫一扫

源码分析RocketMQ消息过滤机制下篇-FilterServer、ClassFilter模式详解

君心浅语 2022-04-24 阅读 72

return true;

}

if (subscriptionData.isClassFilterMode()) {

return true;

}

if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {

return true;

}

ConsumerFilterData realFilterData = this.consumerFilterData;

Map<String, String> tempProperties = properties;

// no expression

if (realFilterData == null || realFilterData.getExpression() == null

|| realFilterData.getCompiledExpression() == null) {

return true;

}

if (tempProperties == null && msgBuffer != null) { // @1

tempProperties = MessageDecoder.decodeProperties(msgBuffer);

}

Object ret = null;

try {

MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);

ret = realFilterData.getCompiledExpression().evaluate(context); // @2

} catch (Throwable e) {

log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);

}

log.debug(“Pull eval result: {}, {}, {}”, ret, realFilterData, tempProperties);

if (ret == null || !(ret instanceof Boolean)) {

return false;

}

return (Boolean) ret;

}

代码@1:从消息体中解码出属性。

代码@2:然后对表达式进行匹配,上下文环境为消息体中的属性,如果匹配,则返回true,否则返回false。

2、ClassFilter 消息过滤机制 FilterServer详解

===================================

从图中可以看出,如果使用了类模式过滤,Consumer 不是直接从Broker拉取,而是从FilterServer上拉取。那么问题来了,FilterServer 是什么、Consume 如何与 FilterServer 打交道。

我们知道,一个客户端,一个专门的消息拉取线程(PullMessageService)专门负责拉取消息,多种过滤模式公用一套消息拉取机制【消息队列负载机制】,那 ClassFilter 模式是如何工作呢?首先,ClassFilter 模式,顾名思义就是消费端可以上传一个Class类文件到 FilterServer, 然后 FilterServer 从 Broker  拉取消息,执行过滤逻辑然后再返回给Consumer。

ClassFilter模式过滤机制,本文从如下三个方面展开。

1)ClassFilter注册(消费端如何提交自己的消息过滤实现类、以及消费订阅信息注册)。

2)消费端如何路由到FilterServer上拉取消息。

3)FilterServer消息拉取与消息消费。

2.1 ClassFilter模式 消息过滤类注册机制


DefaultMQPushConsumerImpl#subscribe

public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException { // @1

try {

SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),

topic, “*”);

subscriptionData.setSubString(fullClassName);

subscriptionData.setClassFilterMode(true); // @2

subscriptionData.setFilterClassSource(filterClassSource);

this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); // @3

if (this.mQClientFactory != null) {

this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); // @4

}

} catch (Exception e) {

throw new MQClientException(“subscription exception”, e);

}

}

代码@1:topic : 主题,fullClassName : 过滤类全类路径名,filterClassSource: 过滤类内容。

代码@2:设置 classFilterMode 为 true,表示类过滤机制。

代码@3:将该主题的订阅信息放入到 RebalanceImp l对象中,一个消费者各自维护一个 RebalanceImpl 对象,用于创建消息拉取任务。

代码@4:sendHeartbeatToAllBrokerWithLock,关键,发送心跳到所有Broker。

MQClientInstance#sendHeartbeatToAllBrokerWithLock

重点关注MQClientInstance#uploadFilterClassSource方法:

继续进入MQClientInstance#uploadFilterClassToAllFilterServer方法。

private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName,

final String topic,

final String filterClassSource) throws UnsupportedEncodingException {

byte[] classBody = null;

int classCRC = 0;

try {

classBody = filterClassSource.getBytes(MixAll.DEFAULT_CHARSET);

classCRC = UtilAll.crc32(classBody);

} catch (Exception e1) {

log.warn(“uploadFilterClassToAllFilterServer Exception, ClassName: {} {}”,

fullClassName,

RemotingHelper.exceptionSimpleDesc(e1));

} // @1

TopicRouteData topicRouteData = this.topicRouteTable.get(topic);

if (topicRouteData != null

&& topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) { // @2

Iterator<Entry<String, List>> it = topicRouteData.getFilterServerTable().entrySet().iterator();

while (it.hasNext()) {

Entry<String, List> next = it.next();

List value = next.getValue();

for (final String fsAddr : value) {

try {

this.mQClientAPIImpl.registerMessageFilterClass(fsAddr, consumerGroup, topic, fullClassName, classCRC, classBody,

5000); // @3

log.info(“register message class filter to {} OK, ConsumerGroup: {} Topic: {} ClassName: {}”, fsAddr, consumerGroup,

topic, fullClassName);

} catch (Exception e) {

log.error(“uploadFilterClassToAllFilterServer Exception”, e);

}

}

}

} else {

log.warn(“register message class filter failed, because no filter server, ConsumerGroup: {} Topic: {} ClassName: {}”,

consumerGroup, topic, fullClassName);

}

}

代码@1:将代码转换成字节数值。

代码@2:根据主题找到路由信息,如果路由信息中的filterServerTable不为空,则通过网络将classname,class内容注册到FilterServer中。这里不免有一 个疑问:TopicRouteInfo中的 filterserver 地址从何而来?我们先简单了解一下代码@3,再来分析 这个问题,也就是FilterServer注册机制。

代码@3:registerMessageFilterClass,向路由信息中包含的 FilterServer 服务器注册过滤类,该方法主要是构建RequestCode.REGISTER_MESSAGE_FILTER_CLASS 消息,发往FilterServer。具体处理逻辑,在FilterServer端。

2.1.1 FilterClassManager 源码分析

FilterServer收到REGISTER_MESSAGE_FILTER_CLASS,完成类的注册与类加载。

2.1.1.1 FilterClassManager#registerFilterClass

public boolean registerFilterClass(final String consumerGroup, final String topic,

final String className, final int classCRC, final byte[] filterSourceBinary) { // @1

final String key = buildKey(consumerGroup, topic); // @2

boolean registerNew = false;

FilterClassInfo filterClassInfoPrev = this.filterClassTable.get(key);

if (null == filterClassInfoPrev) { // @3

registerNew = true;

} else {

if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) { // @4

if (filterClassInfoPrev.getClassCRC() != classCRC && classCRC != 0) {

registerNew = true;

}

}

}

if (registerNew) { // @5

synchronized (this.compileLock) {

filterClassInfoPrev = this.filterClassTable.get(key);

if (null != filterClassInfoPrev && filterClassInfoPrev.getClassCRC() == classCRC) { //@6

return true;

}

try {

FilterClassInfo filterClassInfoNew = new FilterClassInfo();

filterClassInfoNew.setClassName(className);

filterClassInfoNew.setClassCRC(0);

filterClassInfoNew.setMessageFilter(null);

if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) { // @7

String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);

Class<?> newClass = DynaCode.compileAndLoadClass(className, javaSource);

Object newInstance = newClass.newInstance();

filterClassInfoNew.setMessageFilter((MessageFilter) newInstance);

filterClassInfoNew.setClassCRC(classCRC);

}

this.filterClassTable.put(key, filterClassInfoNew);

} catch (Throwable e) {

String info =

String

.format(

“FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s”,

consumerGroup, topic, className);

log.error(info, e);

return false;

}

}

}

return true;

}

代码@1:consumerGroup 消费组名称;topic:消费主题;className:过滤类; classCRC:过滤类crc,filterSourceBinary 过滤类内容字节数组。

代码@2:构建 FilterClass 信息的缓存key,主题名 + “@” + 消费组名。

代码@3:如果当前不存在该key的过滤器信息,则认为是第一次注册。

代码@4:如果允许客户端编译上传的类,并且原先的过滤信息的crc与新的额crc不一样,也认为是第一次注册,将覆盖原先的注册信息。

代码@5:加锁,防止并发修改注册信息Map。

代码@6:这里是双重检查(并发编程通用的手段),例如,同一个消费组多个消费者同时注册,进行排队,一个处理好了之后,其他的获取锁,再检查一次,避免重复操作。

代码@7: 如果允许客户端编译上传的类(clientUploadFilterClassEnable=true),则根据过滤类名,过滤类源代码,利用jdk提供的编译API(JavaCompiler),具体封装类(DynaCode),将类编译好,如果不允许编译的话(clientUploadFilterClassEnable=false),就只是 收集这些信息,真正的类加载需要去服务器去下载,然后再编译。这里主要是基于安全考虑,因为允许消费者(应用程序)直接上传JAVA类,本身就是一件危险的事情。那如果clientUploadFilterClassEnable=false,那如何编译呢?

2.1.1.2 FilterClassManager#fetchClassFromRemoteHost

public void start() {

if (!this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

fetchClassFromRemoteHost();

}

}, 1, 1, TimeUnit.MINUTES); // @1

}

}

private void fetchClassFromRemoteHost() {

Iterator<Entry<String, FilterClassInfo>> it = this.filterClassTable.entrySet().iterator();

while (it. 《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 hasNext()) {

try {

Entry<String, FilterClassInfo> next = it.next();

FilterClassInfo filterClassInfo = next.getValue();

String[] topicAndGroup = next.getKey().split(“@”);

String responseStr =

this.filterClassFetchMethod.fetch(topicAndGroup[0], topicAndGroup[1],

filterClassInfo.getClassName()); // @2

byte[] filterSourceBinary = responseStr.getBytes(“UTF-8”);

int classCRC = UtilAll.crc32(responseStr.getBytes(“UTF-8”));

if (classCRC != filterClassInfo.getClassCRC()) {

String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);

Class<?> newClass =

DynaCode.compileAndLoadClass(filterClassInfo.getClassName(), javaSource);

Object newInstance = newClass.newInstance();

filterClassInfo.setMessageFilter((MessageFilter) newInstance);

filterClassInfo.setClassCRC(classCRC);

举报

相关推荐

0 条评论