6.1 BrokerConfig
public class BrokerConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
@ImportantField
private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
@ImportantField
private String brokerIP1 = RemotingUtil.getLocalAddress();
private String brokerIP2 = RemotingUtil.getLocalAddress();
@ImportantField
private String brokerName = localHostName();
@ImportantField
private String brokerClusterName = "DefaultCluster";
@ImportantField
private long brokerId = MixAll.MASTER_ID;
private int brokerPermission = PermName.PERM_READ | PermName.PERM_WRITE;
private int defaultTopicQueueNums = 8;
@ImportantField
private boolean autoCreateTopicEnable = true;
private boolean clusterTopicEnable = true;
private boolean brokerTopicEnable = true;
@ImportantField
private boolean autoCreateSubscriptionGroup = true;
private String messageStorePlugIn = "";
@ImportantField
private String msgTraceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
@ImportantField
private boolean traceTopicEnable = false;
private int sendMessageThreadPoolNums = 1;
private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors();
private int adminBrokerThreadPoolNums = 16;
private int clientManageThreadPoolNums = 32;
private int consumerManageThreadPoolNums = 32;
private int heartbeatThreadPoolNums = Math.min(32, Runtime.getRuntime().availableProcessors());
private int endTransactionThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors() * 2;
private int flushConsumerOffsetInterval = 1000 * 5;
private int flushConsumerOffsetHistoryInterval = 1000 * 60;
@ImportantField
private boolean rejectTransactionMessage = false;
@ImportantField
private boolean fetchNamesrvAddrByAddressServer = false;
private int sendThreadPoolQueueCapacity = 10000;
private int pullThreadPoolQueueCapacity = 100000;
private int queryThreadPoolQueueCapacity = 20000;
private int clientManagerThreadPoolQueueCapacity = 1000000;
private int consumerManagerThreadPoolQueueCapacity = 1000000;
private int heartbeatThreadPoolQueueCapacity = 50000;
private int endTransactionPoolQueueCapacity = 100000;
private int filterServerNums = 0;
private boolean longPollingEnable = true;
private long shortPollingTimeMills = 1000;
private boolean notifyConsumerIdsChangedEnable = true;
private boolean highSpeedMode = false;
private boolean commercialEnable = true;
private int commercialTimerCount = 1;
private int commercialTransCount = 1;
private int commercialBigCount = 1;
private int commercialBaseCount = 1;
private boolean transferMsgByHeap = true;
private int maxDelayTime = 40;
private String regionId = MixAll.DEFAULT_TRACE_REGION_ID;
private int registerBrokerTimeoutMills = 6000;
private boolean slaveReadEnable = false;
private boolean disableConsumeIfConsumerReadSlowly = false;
private long consumerFallbehindThreshold = 1024L * 1024 * 1024 * 16;
private boolean brokerFastFailureEnable = true;
private long waitTimeMillsInSendQueue = 200;
private long waitTimeMillsInPullQueue = 5 * 1000;
private long waitTimeMillsInHeartbeatQueue = 31 * 1000;
private long waitTimeMillsInTransactionQueue = 3 * 1000;
private long startAcceptSendRequestTimeStamp = 0L;
private boolean traceOn = true;
private boolean enableCalcFilterBitMap = false;
private int expectConsumerNumUseFilter = 32;
private int maxErrorRateOfBloomFilter = 20;
private long filterDataCleanTimeSpan = 24 * 3600 * 1000;
private boolean filterSupportRetry = false;
private boolean enablePropertyFilter = false;
private boolean compressedRegister = false;
private boolean forceRegister = true;
private int registerNameServerPeriod = 1000 * 30;
@ImportantField
private long transactionTimeOut = 6 * 1000;
@ImportantField
private int transactionCheckMax = 15;
@ImportantField
private long transactionCheckInterval = 60 * 1000;
@ImportantField
private boolean aclEnable = false;
}
6.2 MixAll
public class MixAll {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public static final String ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME";
public static final String ROCKETMQ_HOME_PROPERTY = "rocketmq.home.dir";
public static final String NAMESRV_ADDR_ENV = "NAMESRV_ADDR";
public static final String NAMESRV_ADDR_PROPERTY = "rocketmq.namesrv.addr";
public static final String MESSAGE_COMPRESS_LEVEL = "rocketmq.message.compressLevel";
public static final String DEFAULT_NAMESRV_ADDR_LOOKUP = "jmenv.tbsite.net";
public static final String WS_DOMAIN_NAME = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
public static final String WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";
public static final String BENCHMARK_TOPIC = "BenchmarkTest";
public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER";
public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER";
public static final String TOOLS_CONSUMER_GROUP = "TOOLS_CONSUMER";
public static final String FILTERSRV_CONSUMER_GROUP = "FILTERSRV_CONSUMER";
public static final String MONITOR_CONSUMER_GROUP = "__MONITOR_CONSUMER";
public static final String CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER";
public static final String SELF_TEST_PRODUCER_GROUP = "SELF_TEST_P_GROUP";
public static final String SELF_TEST_CONSUMER_GROUP = "SELF_TEST_C_GROUP";
public static final String SELF_TEST_TOPIC = "SELF_TEST_TOPIC";
public static final String OFFSET_MOVED_EVENT = "OFFSET_MOVED_EVENT";
public static final String ONS_HTTP_PROXY_GROUP = "CID_ONS-HTTP-PROXY";
public static final String CID_ONSAPI_PERMISSION_GROUP = "CID_ONSAPI_PERMISSION";
public static final String CID_ONSAPI_OWNER_GROUP = "CID_ONSAPI_OWNER";
public static final String CID_ONSAPI_PULL_GROUP = "CID_ONSAPI_PULL";
public static final String CID_RMQ_SYS_PREFIX = "CID_RMQ_SYS_";
public static final List<String> LOCAL_INET_ADDRESS = getLocalInetAddress();
public static final String LOCALHOST = localhost();
public static final String DEFAULT_CHARSET = "UTF-8";
public static final long MASTER_ID = 0L;
public static final long CURRENT_JVM_PID = getPID();
public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";
public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%";
public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_";
public static final String UNIQUE_MSG_QUERY_FLAG = "_UNIQUE_KEY_QUERY";
public static final String DEFAULT_TRACE_REGION_ID = "DefaultRegion";
public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType";
public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC";
public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC";
public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC";
public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS";
public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
}
6.3 BrokerPathConfigHelper
public class BrokerPathConfigHelper {
private static String brokerConfigPath = System.getProperty("user.home") + File.separator + "store"
+ File.separator + "config" + File.separator + "broker.properties";
public static String getBrokerConfigPath() {
return brokerConfigPath;
}
public static void setBrokerConfigPath(String path) {
brokerConfigPath = path;
}
public static String getTopicConfigPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "topics.json";
}
public static String getConsumerOffsetPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "consumerOffset.json";
}
public static String getSubscriptionGroupPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "subscriptionGroup.json";
}
public static String getConsumerFilterPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "consumerFilter.json";
}
}