0
点赞
收藏
分享

微信扫一扫

[数据库中间件-Mycat 1.6.7.6-release源码解析系列]-2-启动入口MycatStartup的main方法


2-启动入口MycatStartup的main方法

public final class MycatStartup {
private static final String dateFormat = "yyyy-MM-dd HH:mm:ss";
private static final Logger LOGGER = LoggerFactory.getLogger(MycatStartup.class);
public static void main(String[] args) {
//use zk ?如果配置了Zookeeper则初始化
ZkConfig.getInstance().initZk();
try {
//获取主文件目录,如果未配置则以当前目录为准
String home = SystemConfig.getHomePath();
if (home == null) {
System.out.println(SystemConfig.SYS_HOME + " is not set.");
System.exit(-1);
}
// init 获取MycatServer单例对象
MycatServer server = MycatServer.getInstance();
//启动之前的初始化当前版本未做什么操作
server.beforeStart();

// startup 启动MyCat服务
server.startup();
System.out.println("MyCAT Server startup successfully. see logs in logs/mycat.log");

} catch (Exception e) {
SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
LOGGER.error(sdf.format(new Date()) + " startup error", e);
System.exit(-1);
}
}
}

如果使用Zookeeper的话前面先初始化Zookeeper相关配置,然后初始化MycatServer,相关代码如下:

MycatServer server = MycatServer.getInstance();
server.beforeStart();

// startup
server.startup();

先获取MycatServer单例对象

那就先来看下初始化MycatServer对象代码:

private MycatServer() {

//读取文件配置 //读取schema.xml,rule.xml和server.xml
this.config = new MycatConfig();

//定时线程池,单线程线程池
scheduler = Executors.newSingleThreadScheduledExecutor();

//心跳调度独立出来,避免被其他任务影响
heartbeatScheduler = Executors.newSingleThreadScheduledExecutor();

//SQL记录器SQL统计排序记录器
this.sqlRecorder = new SQLRecorder(config.getSystem().getSqlRecordCount());

/**
* 是否在线,MyCat manager中有命令控制
* | offline | Change MyCat status to OFF |
* | online | Change MyCat status to ON |
*/
this.isOnline = new AtomicBoolean(true);

//缓存服务初始化
cacheService = new CacheService();

//路由计算初始化
routerService = new RouteService(cacheService);

// load datanode active index from properties
dnIndexProperties = loadDnIndexProps();
try {
//SQL解析器
sqlInterceptor = (SQLInterceptor) Class.forName(
config.getSystem().getSqlInterceptor()).newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}

//catlet加载器
catletClassLoader = new DynaClassLoader(SystemConfig.getHomePath()
+ File.separator + "catlet", config.getSystem().getCatletClassCheckSeconds());

//记录启动时间
this.startupTime = TimeUtil.currentTimeMillis();
if(isUseZkSwitch()) {
String path= ZKUtils.getZKBasePath()+"lock/dnindex.lock";
dnindexLock = new InterProcessMutex(ZKUtils.getConnection(), path);
}

}

初始化配置MycatConfig

直接来看下构造器代码:

public MycatConfig() {

//读取schema.xml,rule.xml和server.xml
ConfigInitializer confInit = new ConfigInitializer(true);
this.system = confInit.getSystem();
this.users = confInit.getUsers();
this.schemas = confInit.getSchemas();
this.dataHosts = confInit.getDataHosts();

this.dataNodes = confInit.getDataNodes();
for (PhysicalDBPool dbPool : dataHosts.values()) {
dbPool.setSchemas(getDataNodeSchemasOfDataHost(dbPool.getHostName()));
}

this.firewall = confInit.getFirewall();
this.cluster = confInit.getCluster();

//初始化重加载配置时间
this.reloadTime = TimeUtil.currentTimeMillis();
this.rollbackTime = -1L;
this.status = RELOAD;

//配置加载锁
this.lock = new ReentrantLock();
}

ConfigInitializer

读取配置的ConfigInitializer类型的构造器

public ConfigInitializer(boolean loadDataHost) {

//读取rule.xml和schema.xml
SchemaLoader schemaLoader = new XMLSchemaLoader();

//读取server.xml
XMLConfigLoader configLoader = new XMLConfigLoader(schemaLoader);

schemaLoader = null;

//加载配置
this.system = configLoader.getSystemConfig();
this.users = configLoader.getUserConfigs();
this.schemas = configLoader.getSchemaConfigs();

//是否重新加载DataHost和对应的DataNode
if (loadDataHost) {
this.dataHosts = initDataHosts(configLoader);
this.dataNodes = initDataNodes(configLoader);
}

//权限管理
this.firewall = configLoader.getFirewallConfig();
this.cluster = initCobarCluster(configLoader);

//不同类型的全局序列处理器的配置加载
if (system.getSequnceHandlerType() == SystemConfig.SEQUENCEHANDLER_MYSQLDB) {
IncrSequenceMySQLHandler.getInstance().load();
}

if (system.getSequnceHandlerType() == SystemConfig.SEQUENCEHANDLER_LOCAL_TIME) {
IncrSequenceTimeHandler.getInstance().load();
}

if (system.getSequnceHandlerType() == SystemConfig.SEQUENCEHANDLER_ZK_DISTRIBUTED) {
DistributedSequenceHandler.getInstance(system).load();
}

if (system.getSequnceHandlerType() == SystemConfig.SEQUENCEHANDLER_ZK_GLOBAL_INCREMENT) {
IncrSequenceZKHandler.getInstance().load();
}

/**
* 配置文件初始化, 自检
*/
this.selfChecking0();
}

XMLSchemaLoader

XMLSchemaLoader 加载xml的文件

public XMLSchemaLoader(String schemaFile, String ruleFile) {
//先读取rule.xml
XMLRuleLoader ruleLoader = new XMLRuleLoader(ruleFile);
//将tableRules拿出,用于这里加载Schema做rule有效判断,以及之后的分片路由计算
this.tableRules = ruleLoader.getTableRules();
//释放ruleLoader
ruleLoader = null;
this.dataHosts = new HashMap<String, DataHostConfig>();
this.dataNodes = new HashMap<String, DataNodeConfig>();
this.schemas = new HashMap<String, SchemaConfig>();
//读取加载schema配置 schema.xml
this.load(DEFAULT_DTD, schemaFile == null ? DEFAULT_XML : schemaFile);
}

load方法加载schema信息

private void load(String dtdFile, String xmlFile) {
InputStream dtd = null;
InputStream xml = null;
try {
dtd = XMLSchemaLoader.class.getResourceAsStream(dtdFile);
xml = XMLSchemaLoader.class.getResourceAsStream(xmlFile);
//使用schema.dtd文件验证解析schema.xml文件
Element root = ConfigUtil.getDocument(dtd, xml).getDocumentElement();
//先加载所有的DataHost
loadDataHosts(root);
//再加载所有的DataNode
loadDataNodes(root);
//最后加载所有的Schema
loadSchemas(root);
} catch (ConfigException e) {
throw e;
} catch (Exception e) {
throw new ConfigException(e);
} finally {

if (dtd != null) {
try {
dtd.close();
} catch (IOException e) {
}
}

if (xml != null) {
try {
xml.close();
} catch (IOException e) {
}
}
}
}

XMLRuleLoader 解析rule.xml:

public XMLRuleLoader(String ruleFile) {
// this.rules = new HashSet<RuleConfig>();
//rule名 -> rule
this.tableRules = new HashMap<String, TableRuleConfig>();
//function名 -> 具体分片算法
this.functions = new HashMap<String, AbstractPartitionAlgorithm>();
//rule.dtd,rule.xml
load(DEFAULT_DTD, ruleFile == null ? DEFAULT_XML : ruleFile);
}

load方法

private void load(String dtdFile, String xmlFile) {
InputStream dtd = null;
InputStream xml = null;
try {
//获取当前类加载器下的rule.dtd文件
dtd = XMLRuleLoader.class.getResourceAsStream(dtdFile);
//获取当前类加载器下的rule.xml文件
xml = XMLRuleLoader.class.getResourceAsStream(xmlFile);
//读取出语意树
Element root = ConfigUtil.getDocument(dtd, xml)
.getDocumentElement();
//加载Function
loadFunctions(root);
//加载TableRule
loadTableRules(root);
} catch (ConfigException e) {
throw e;
} catch (Exception e) {
throw new ConfigException(e);
} finally {
if (dtd != null) {
try {
dtd.close();
} catch (IOException e) {
}
}
if (xml != null) {
try {
xml.close();
} catch (IOException e) {
}
}
}
}

loadFunctions方法

参考下配置中的function标签配置:

<function name="hash-int"
class="io.mycat.route.function.PartitionByFileMap">
<property name="mapFile">partition-hash-int.txt</property>
</function>

下面是读取function的java代码

private void loadFunctions(Element root) throws ClassNotFoundException,
InstantiationException, IllegalAccessException,
InvocationTargetException {
//获取所有的function标签
NodeList list = root.getElementsByTagName("function");
for (int i = 0, n = list.getLength(); i < n; ++i) {
//遍历所有的function配置 node为当前读取到的function配置
Node node = list.item(i);
if (node instanceof Element) {
Element e = (Element) node;
//获取name标签
String name = e.getAttribute("name");
//如果Map已有,则function重复
if (functions.containsKey(name)) {
throw new ConfigException("rule function " + name
+ " duplicated!");
}
//获取class标签
String clazz = e.getAttribute("class");
//根据class利用反射新建分片算法 分片算法必须继承AbstractPartitionAlgorithm类型
AbstractPartitionAlgorithm function = createFunction(name, clazz);
//根据读取参数配置分片算法 将function下配置的property键值对根据键设置到function对应成员变量中
ParameterMapping.mapping(function, ConfigUtil.loadElements(e));
//每个AbstractPartitionAlgorithm可能会实现init来初始化,调用分片算法的init方法
function.init();
//放入functions map key为分片算法名字,值为分片算法对象
functions.put(name, function);
}
}
}

当前版本实现的分片路由算法有如下几种类型:

  • AutoPartitionByLong
  • LatestMonthPartion
  • PartitionByCRC32PreSlot
  • PartitionByDate
  • PartitionByFileMap
  • PartitionByHashMod
  • PartitionByHotDate
  • PartitionByJumpConsistentHash
  • PartitionByLong
  • PartitionByMod
  • PartitionByMonth
  • PartitionByMurmurHash
  • PartitionByPattern
  • PartitionByPrefixPattern
  • PartitionByRangeDateHash
  • PartitionByRangeMod
  • PartitionByString
  • PartitionDirectBySubString

loadTableRules方法

配置文件中配置的tableRule标签

<tableRule name="rule1">
<rule>
<columns>id</columns>
<algorithm>func1</algorithm>
</rule>
</tableRule>

对应java解析代码如下:

private void loadTableRules(Element root) throws SQLSyntaxErrorException {
//获取每个tableRule标签
NodeList list = root.getElementsByTagName("tableRule");
for (int i = 0, n = list.getLength(); i < n; ++i) {
//遍历所有的tableRule标签
Node node = list.item(i);
if (node instanceof Element) {
Element e = (Element) node;
//先判断是否重复
String name = e.getAttribute("name");
if (tableRules.containsKey(name)) {
throw new ConfigException("table rule " + name
+ " duplicated!");
}
//获取rule标签
NodeList ruleNodes = e.getElementsByTagName("rule");
int length = ruleNodes.getLength();
if (length > 1) {
throw new ConfigException("only one rule can defined :"
+ name);
}
//目前只处理第一个,未来可能有多列复合逻辑需求
//RuleConfig是保存着rule与function对应关系的对象
RuleConfig rule = loadRule((Element) ruleNodes.item(0));
String funName = rule.getFunctionName();
//判断当前表规则配置的路由方法类型function是否存在,获取function,如果不存在的话抛出异常
AbstractPartitionAlgorithm func = functions.get(funName);
if (func == null) {
throw new ConfigException("can't find function of name :"
+ funName);
}
rule.setRuleAlgorithm(func);
//保存到tableRules
tableRules.put(name, new TableRuleConfig(name, rule));
}
}
}

顺便看下tableRule下的rule标签的解析

loadRule((Element) ruleNodes.item(0))
private RuleConfig loadRule(Element element) throws SQLSyntaxErrorException {
//读取columns 列名
Element columnsEle = ConfigUtil.loadElement(element, "columns");
//这个规则文件里面一个列只能对应一个算法
String column = columnsEle.getTextContent();
String[] columns = SplitUtil.split(column, ',', true);
//如果这里面配置逗号隔开的列则抛出异常防止报错
if (columns.length > 1) {
throw new ConfigException("table rule coulmns has multi values:"
+ columnsEle.getTextContent());
}
//读取algorithm,这个对应的是前面说的function标签的名字
Element algorithmEle = ConfigUtil.loadElement(element, "algorithm");
String algorithm = algorithmEle.getTextContent();
return new RuleConfig(column.toUpperCase(), algorithm);
}

XMLSchemaLoader

接下来可以详细看下表配置文件对表信息和数据连接信息的加载主要代码如下:

//先加载所有的DataHost
loadDataHosts(root);
//再加载所有的DataNode
loadDataNodes(root);
//最后加载所有的Schema
loadSchemas(root);

loadDataHosts

loadDataHosts加载数据库连接信息,用来配置数据库连接参数比如连接地址,数据库负载均衡策略,连接数等

在看解析过程之前可以先看下需要解析的xml标签如下:

<dataHost name="localhost1" maxCon="1000" minCon="10" balance="0"
writeType="0" dbType="mysql" dbDriver="native" switchType="1" slaveThreshold="100">
<heartbeat>select user()</heartbeat>
<!-- can have multi write hosts -->
<writeHost host="hostM1" url="localhost:3306" user="root"
password="123456">
<!-- can have multi read hosts -->
<readHost host="hostS2" url="192.168.1.200:3306" user="root" password="xxx" />
</writeHost>
<writeHost host="hostS1" url="localhost:3316" user="root"
password="123456" />
<!-- <writeHost host="hostM2" url="localhost:3316" user="root" password="123456"/> -->
</dataHost>

java代码解析过程如下:

private void loadDataHosts(Element root) {
//从schema文件中获取dataHost标签
NodeList list = root.getElementsByTagName("dataHost");
for (int i = 0, n = list.getLength(); i < n; ++i) {
//遍历所有的dataHost标签获取当前元素
Element element = (Element) list.item(i);
//获取到dataHost标签下的name属性
String name = element.getAttribute("name");
//name属性用来标记主机配置信息,这里判断是否重复
if (dataHosts.containsKey(name)) {
throw new ConfigException("dataHost name " + name + "duplicated!");
}
//读取最大连接数
int maxCon = Integer.parseInt(element.getAttribute("maxCon"));
//读取最小连接数
int minCon = Integer.parseInt(element.getAttribute("minCon"));
/**
* 读取负载均衡配置
* 1. balance="0", 不开启分离机制,所有读操作都发送到当前可用的 writeHost 上。
* 2. balance="1",全部的readHost与stand by writeHost参与select语句的负载均衡,简单的说,当双主双从模式(M1->S1,M2->S2,并且M1与 M2互为主备),正常情况下,M2,S1,S2都参与select语句的负载均衡。
* 3. balance="2",所有读操作都随机的在 writeHost、readhost 上分发。
* 4. balance="3",所有读请求随机的分发到 wiriterHost 对应的 readhost 执行,writerHost 不负担读压力
*/
int balance = Integer.parseInt(element.getAttribute("balance"));
/**
* 读取切换类型
* -1 表示不自动切换
* 1 默认值,自动切换
* 2 基于MySQL主从同步的状态决定是否切换 心跳询句为 show slave status
* 3 基于 MySQL galary cluster 的切换机制
*/
String switchTypeStr = element.getAttribute("switchType");
int switchType = switchTypeStr.equals("") ? -1 : Integer.parseInt(switchTypeStr);
//读取从延迟界限
String slaveThresholdStr = element.getAttribute("slaveThreshold");
int slaveThreshold = slaveThresholdStr.equals("") ? -1 : Integer.parseInt(slaveThresholdStr);

//如果 tempReadHostAvailable 设置大于 0 则表示写主机如果挂掉, 临时的读服务依然可用
String tempReadHostAvailableStr = element.getAttribute("tempReadHostAvailable");
boolean tempReadHostAvailable = !tempReadHostAvailableStr.equals("") && Integer.parseInt(tempReadHostAvailableStr) > 0;
/**
* 读取 写类型
* 这里只支持 0 - 所有写操作仅配置的第一个 writeHost
* writeType 属性
* 负载均衡类型,目前的取值有 2 种:
*writeType=“0”,所有写操作发送到配置的第一个 writeHost,第一个挂了切到还存活的第二个 writeHost,重新启动后以切换后的为准,切换信息记录在 dnindex.properties 配置文件。
* writeType=“1”,所有写操作都随机的发送到配置的 writeHost。(1.5+ 废弃,不推荐使用)
*/

String writeTypStr = element.getAttribute("writeType");
int writeType = "".equals(writeTypStr) ? PhysicalDBPool.WRITE_ONLYONE_NODE : Integer.parseInt(writeTypStr);
//数据库驱动
String dbDriver = element.getAttribute("dbDriver");
//数据库类型
String dbType = element.getAttribute("dbType");
//
String filters = element.getAttribute("filters");
String logTimeStr = element.getAttribute("logTime");
String slaveIDs = element.getAttribute("slaveIDs");
long logTime = "".equals(logTimeStr) ? PhysicalDBPool.LONG_TIME : Long.parseLong(logTimeStr) ;
//读取心跳语句 对应heartbeat子标签名字
String heartbeatSQL = element.getElementsByTagName("heartbeat").item(0).getTextContent();
//读取 初始化sql配置,用于oracle
NodeList connectionInitSqlList = element.getElementsByTagName("connectionInitSql");
String initConSQL = null;
if (connectionInitSqlList.getLength() > 0) {
initConSQL = connectionInitSqlList.item(0).getTextContent();
}
//读取writeHost列表 对应writeHost子标签
NodeList writeNodes = element.getElementsByTagName("writeHost");
DBHostConfig[] writeDbConfs = new DBHostConfig[writeNodes.getLength()];
Map<Integer, DBHostConfig[]> readHostsMap = new HashMap<Integer, DBHostConfig[]>(2);
Set<String> writeHostNameSet = new HashSet<String>(writeNodes.getLength());
for (int w = 0; w < writeDbConfs.length; w++) {
//遍历所有的写库配置writeHost
Element writeNode = (Element) writeNodes.item(w);
//解析写库配置转换配置信息为DBHostConfig类型对象
writeDbConfs[w] = createDBHostConf(name, writeNode, dbType, dbDriver, maxCon, minCon,filters,logTime);
//写库连接信息hostname不能重复
if(writeHostNameSet.contains(writeDbConfs[w].getHostName())) {
throw new ConfigException("writeHost " + writeDbConfs[w].getHostName() + " duplicated!");
} else {
writeHostNameSet.add(writeDbConfs[w].getHostName());
}
NodeList readNodes = writeNode.getElementsByTagName("readHost");
//读取对应的每一个readHost 读库信息配置标签总是配置在写库信息配置标签里面
if (readNodes.getLength() != 0) {
DBHostConfig[] readDbConfs = new DBHostConfig[readNodes.getLength()];
Set<String> readHostNameSet = new HashSet<String>(readNodes.getLength());
//遍历readHost标签元素列表
for (int r = 0; r < readDbConfs.length; r++) {
//读取每个readHost元素
Element readNode = (Element) readNodes.item(r);
//封装readHost读库连接信息为DBHostConfig
readDbConfs[r] = DBHostConfig(name, readNode, dbType, dbDriver, maxCon, minCon,filters, logTime);
//读库连接信息hostname不能重复
if(readHostNameSet.contains(readDbConfs[r].getHostName())) {
throw new ConfigException("readHost " + readDbConfs[r].getHostName() + " duplicated!");
} else {
readHostNameSet.add(readDbConfs[r].getHostName());
}
}
//缓存读库连接配置信息
readHostsMap.put(w, readDbConfs);
}
}
//封装读库和写库信息最终统一将连接信息封装为DataHostConfig配置类型
DataHostConfig hostConf = new DataHostConfig(name, dbType, dbDriver,
writeDbConfs, readHostsMap, switchType, slaveThreshold, tempReadHostAvailable);

hostConf.setMaxCon(maxCon);
hostConf.setMinCon(minCon);
hostConf.setBalance(balance);
hostConf.setWriteType(writeType);
hostConf.setHearbeatSQL(heartbeatSQL);
hostConf.setConnectionInitSql(initConSQL);
hostConf.setFilters(filters);
hostConf.setLogTime(logTime);
hostConf.setSlaveIDs(slaveIDs);
dataHosts.put(hostConf.getName(), hostConf);
}
}

loadDataNodes

//再加载所有的DataNode

loadDataNodes(root);

在解析DataNode之前先来看下常见的DataNode标签配置信息:

<!-- <dataNode name="dn1$0-743" dataHost="localhost1" database="db$0-743"
/> -->
<dataNode name="dn1" dataHost="localhost1" database="db1" />
<dataNode name="dn2" dataHost="localhost1" database="db2" />

loadDataNodes解析标签源码

private void loadDataNodes(Element root) {
//读取DataNode分支
NodeList list = root.getElementsByTagName("dataNode");
for (int i = 0, n = list.getLength(); i < n; i++) {
//遍历所有的dataNode节点信息
Element element = (Element) list.item(i);
//解析name属性
String dnNamePre = element.getAttribute("name");
//解析database属性,数据库名字
String databaseStr = element.getAttribute("database");
//读取数据库主机配置信息dataHost 对应标签dataHost的name
String host = element.getAttribute("dataHost");
//字符串不为空 name,databaseStr,host 三个配置不能为空
if (empty(dnNamePre) || empty(databaseStr) || empty(host)) {
throw new ConfigException("dataNode " + dnNamePre + " define error ,attribute can't be empty");
}
//dnNames(name),databases(database),hostStrings(dataHost)都可以配置多个,以',', '$', '-'区分,但是需要保证database的个数*dataHost的个数=name的个数
//多个dataHost与多个database如果写在一个标签,则每个dataHost拥有所有database
//例如:<dataNode name="dn1$0-75" dataHost="localhost$1-10" database="db$0-759" />
//则为:localhost1拥有dn1$0-75,localhost2也拥有dn1$0-75(对应db$76-151)
String[] dnNames = io.mycat.util.SplitUtil.split(dnNamePre, ',', '$', '-');
String[] databases = io.mycat.util.SplitUtil.split(databaseStr, ',', '$', '-');
String[] hostStrings = io.mycat.util.SplitUtil.split(host, ',', '$', '-');

if (dnNames.length > 1 && dnNames.length != databases.length * hostStrings.length) {
throw new ConfigException("dataNode " + dnNamePre
+ " define error ,dnNames.length must be=databases.length*hostStrings.length");
}
if (dnNames.length > 1) {
//合并数据库连接地址和数据库名字到一个数组里面
List<String[]> mhdList = mergerHostDatabase(hostStrings, databases);
for (int k = 0; k < dnNames.length; k++) {
String[] hd = mhdList.get(k);
String dnName = dnNames[k];
String databaseName = hd[1];
String hostName = hd[0];
//创建并缓存节点配置信息DataNodeConfig类型
createDataNode(dnName, databaseName, hostName);
}

} else {
//创建并缓存节点配置信息DataNodeConfig类型
createDataNode(dnNamePre, databaseStr, host);
}

}
}

loadSchemas 解析schema配置信息

在查看配置加载之前先来看一个schema配置示例

<schema name="TESTDB" checkSQLschema="false" sqlMaxLimit="100">
<!-- auto sharding by id (long) -->
<table name="travelrecord" dataNode="dn1,dn2,dn3" rule="auto-sharding-long" />

<!-- global table is auto cloned to all defined data nodes ,so can join
with any table whose sharding node is in the same data node -->
<table name="company" primaryKey="ID" type="global" dataNode="dn1,dn2,dn3" />
<table name="goods" primaryKey="ID" type="global" dataNode="dn1,dn2" />
<!-- random sharding using mod sharind rule -->
<table name="hotnews" primaryKey="ID" autoIncrement="true" dataNode="dn1,dn2,dn3"
rule="mod-long" />
<!-- <table name="dual" primaryKey="ID" dataNode="dnx,dnoracle2" type="global"
needAddLimit="false"/> <table name="worker" primaryKey="ID" dataNode="jdbc_dn1,jdbc_dn2,jdbc_dn3"
rule="mod-long" /> -->
<table name="employee" primaryKey="ID" dataNode="dn1,dn2"
rule="sharding-by-intfile" />
<table name="customer" primaryKey="ID" dataNode="dn1,dn2"
rule="sharding-by-intfile">
<childTable name="orders" primaryKey="ID" joinKey="customer_id"
parentKey="id">
<childTable name="order_items" joinKey="order_id"
parentKey="id" />
</childTable>
<childTable name="customer_addr" primaryKey="ID" joinKey="customer_id"
parentKey="id" />
</table>
<!-- <table name="oc_call" primaryKey="ID" dataNode="dn1$0-743" rule="latest-month-calldate"
/> -->
</schema>

接下来看下表配置信息的解析

private void loadSchemas(Element root) {
//读取schema根节点
NodeList list = root.getElementsByTagName("schema");
//遍历schema节点列表
for (int i = 0, n = list.getLength(); i < n; i++) {
Element schemaElement = (Element) list.item(i);
//读取各个属性 配置逻辑库的名字
String name = schemaElement.getAttribute("name");
//配置该逻辑库默认的分片。 没有通过 table 标签配置的表,就会走到此处配置的默认的分片上。
String dataNode = schemaElement.getAttribute("dataNode");
//检查发给Mycat的SQL中是否含有库名。boolean类型。 当checkSQLschema=ture时,Mycat来检查SQL中是否有dbName.tableName,如果有,则会删除dbName,
默认false,即不检查。
String checkSQLSchemaStr = schemaElement.getAttribute("checkSQLschema");
//sqlMaxLimit 限制每次查询数据所返回的最大行数。
(server.xml中的limit是整个mycat系统的默认值,这里则是当前逻辑库的默认值,默认先看schema.xml的限制数)
String sqlMaxLimitStr = schemaElement.getAttribute("sqlMaxLimit");
int sqlMaxLimit = -1;
//读取sql返回结果集限制
if (sqlMaxLimitStr != null && !sqlMaxLimitStr.isEmpty()) {
sqlMaxLimit = Integer.parseInt(sqlMaxLimitStr);
}

// check dataNode already exists or not,看schema标签中是否有datanode
String defaultDbType = null;
//校验检查并添加dataNode 配置了默认的逻辑主机则检查默认的逻辑主机配置是否存在,不存在则抛出异常
if (dataNode != null && !dataNode.isEmpty()) {
List<String> dataNodeLst = new ArrayList<String>(1);
dataNodeLst.add(dataNode);
checkDataNodeExists(dataNodeLst);
String dataHost = dataNodes.get(dataNode).getDataHost();
defaultDbType = dataHosts.get(dataHost).getDbType();
} else {
dataNode = null;
}
//加载schema下所有tables
Map<String, TableConfig> tables = loadTables(schemaElement);
//判断schema是否重复 逻辑库是不能重复的
if (schemas.containsKey(name)) {
throw new ConfigException("schema " + name + " duplicated!");
}

// 设置了table的不需要设置dataNode属性,没有设置table的必须设置dataNode属性,每个table必须有逻辑主机配置
if (dataNode == null && tables.size() == 0) {
throw new ConfigException(
"schema " + name + " didn't config tables,so you must set dataNode property!");
}

SchemaConfig schemaConfig = new SchemaConfig(name, dataNode,
tables, sqlMaxLimit, "true".equalsIgnoreCase(checkSQLSchemaStr));

//设定DB类型,这对之后的sql语句路由解析有帮助 默认是mysql非默认的则记录一下
if (defaultDbType != null) {
schemaConfig.setDefaultDataNodeDbType(defaultDbType);
if (!"mysql".equalsIgnoreCase(defaultDbType)) {
schemaConfig.setNeedSupportMultiDBType(true);
}
}

// 判断是否有不是mysql的数据库类型,方便解析判断是否启用多数据库分页语法解析
for (TableConfig tableConfig : tables.values()) {
if (isHasMultiDbType(tableConfig)) {
schemaConfig.setNeedSupportMultiDBType(true);
break;
}
}
//记录每种dataNode的DB类型
Map<String, String> dataNodeDbTypeMap = new HashMap<>();
for (String dataNodeName : dataNodes.keySet()) {
DataNodeConfig dataNodeConfig = dataNodes.get(dataNodeName);
String dataHost = dataNodeConfig.getDataHost();
DataHostConfig dataHostConfig = dataHosts.get(dataHost);
if (dataHostConfig != null) {
String dbType = dataHostConfig.getDbType();
dataNodeDbTypeMap.put(dataNodeName, dbType);
}
}
//设置当前逻辑库的数据库类型
schemaConfig.setDataNodeDbTypeMap(dataNodeDbTypeMap);
//缓存逻辑库配置
schemas.put(name, schemaConfig);
}
}

loadTables解析table标签配置

private Map<String, TableConfig> loadTables(Element node) {

// Map<String, TableConfig> tables = new HashMap<String, TableConfig>();

// 支持表名中包含引号[`] BEN GONG
Map<String, TableConfig> tables = new TableConfigMap();
//获取table标签
NodeList nodeList = node.getElementsByTagName("table");
//遍历所有table
for (int i = 0; i < nodeList.getLength(); i++) {
Element tableElement = (Element) nodeList.item(i);
//获取逻辑表的表名
String tableNameElement = tableElement.getAttribute("name").toUpperCase();

//TODO:路由, 增加对动态日期表的支持
String tableNameSuffixElement = tableElement.getAttribute("nameSuffix").toUpperCase();
if ( !"".equals( tableNameSuffixElement ) ) {

if( tableNameElement.split(",").length > 1 ) {
throw new ConfigException("nameSuffix " + tableNameSuffixElement + ", require name parameter cannot multiple breaks!");
}
//前缀用来标明日期格式
tableNameElement = doTableNameSuffix(tableNameElement, tableNameSuffixElement);
}
//记录主键,用于之后路由分析,以及启用自增长主键
String[] tableNames = tableNameElement.split(",");
//该逻辑表对应真实表的主键
String primaryKey = tableElement.hasAttribute("primaryKey") ? tableElement.getAttribute("primaryKey").toUpperCase() : null;
//记录是否主键自增,默认不是,(启用全局sequence handler)
boolean autoIncrement = false;
//使用 autoIncrement=“ true” 指定这个表有使用自增长主键,这样 mycat 才会不抛出分片键找不到的异常。默认为false
if (tableElement.hasAttribute("autoIncrement")) {
autoIncrement = Boolean.parseBoolean(tableElement.getAttribute("autoIncrement"));
}
//记录是否需要加返回结果集限制,默认需要加 与schema标签的 sqlMaxLimit 配合使用。
boolean needAddLimit = true;
if (tableElement.hasAttribute("needAddLimit")) {
needAddLimit = Boolean.parseBoolean(tableElement.getAttribute("needAddLimit"));
}
//记录type,是否为global
String tableTypeStr = tableElement.hasAttribute("type") ? tableElement.getAttribute("type") : null;
int tableType = TableConfig.TYPE_GLOBAL_DEFAULT;
//目前逻辑表只有 “全局表” 和 ”普通表” 两种类型。对应的配置:
//全局表:global。在所有的数据节点中,均有相同的数据。数据冗余到每个数据节点中。
//普通表:不指定该值为 global,则为普通表。
if ("global".equalsIgnoreCase(tableTypeStr)) {
tableType = TableConfig.TYPE_GLOBAL_TABLE;
}
//记录dataNode,就是分布在哪些dataNode上
String dataNode = tableElement.getAttribute("dataNode");
TableRuleConfig tableRule = null;
//当前表的分表路由策略,具体的策略在rule.xml中配置
if (tableElement.hasAttribute("rule")) {
String ruleName = tableElement.getAttribute("rule");
tableRule = tableRules.get(ruleName);
if (tableRule == null) {
throw new ConfigException("rule " + ruleName + " is not found!");
}
}

boolean ruleRequired = false;
//记录是否绑定有分片规则
if (tableElement.hasAttribute("ruleRequired")) {
ruleRequired = Boolean.parseBoolean(tableElement.getAttribute("ruleRequired"));
}

if (tableNames == null) {
throw new ConfigException("table name is not found!");
}
//distribute函数,重新编排dataNode
// Mycat 1.4 增加了distribute 函数,可以用于Table 的//dataNode 属性上,表示将这些dataNode 在该Table 的分片规则里的引//用顺序重新安排,使得他们能均匀分布到几个dataHost 上:如<table name="oc_call" primaryKey="ID" dataNode="distribute(dn1$0-372,dn2$0-372)" rule="latest-monthcalldate"/>


String distPrex = "distribute(";
boolean distTableDns = dataNode.startsWith(distPrex);
if (distTableDns) {
dataNode = dataNode.substring(distPrex.length(), dataNode.length() - 1);
}
//分表功能 使用方式添加 subTables="t_order$1-2,t_order3"。
String subTables = tableElement.getAttribute("subTables");

for (int j = 0; j < tableNames.length; j++) {

String tableName = tableNames[j];
TableRuleConfig tableRuleConfig=tableRule ;
if(tableRuleConfig!=null) {
//对于实现TableRuleAware的function进行特殊处理 根据每个表新建个实例
RuleConfig rule= tableRuleConfig.getRule();
if(rule.getRuleAlgorithm() instanceof TableRuleAware) {
tableRuleConfig = (TableRuleConfig) ObjectUtil.copyObject(tableRuleConfig);
tableRules.remove(tableRuleConfig.getName()) ;
String newRuleName = tableRuleConfig.getName() + "_" + tableName;
tableRuleConfig. setName(newRuleName);
TableRuleAware tableRuleAware= (TableRuleAware) tableRuleConfig.getRule().getRuleAlgorithm();
tableRuleAware.setRuleName(newRuleName);
tableRuleAware.setTableName(tableName);
tableRuleConfig.getRule().getRuleAlgorithm().init();
tableRules.put(newRuleName,tableRuleConfig);
}
}

TableConfig table = new TableConfig(tableName, primaryKey,
autoIncrement, needAddLimit, tableType, dataNode,
getDbType(dataNode),
(tableRuleConfig != null) ? tableRuleConfig.getRule() : null,
ruleRequired, null, false, null, null,subTables);

checkDataNodeExists(table.getDataNodes());
// 检查分片表分片规则配置是否合法
if(table.getRule() != null) {
checkRuleSuitTable(table);
}

if (distTableDns) {
distributeDataNodes(table.getDataNodes());
}
//检查去重
if (tables.containsKey(table.getName())) {
throw new ConfigException("table " + tableName + " duplicated!");
}
//放入map
tables.put(table.getName(), table);
}
//只有tableName配置的是单个表(没有逗号)的时候才能有子表
//childTable解析 <!--ER表,通过parentKey去找 users表对应的分片,放到同一个分片下-->
if (tableNames.length == 1) {
TableConfig table = tables.get(tableNames[0]);
// process child tables
processChildTables(tables, table, dataNode, tableElement);
}
}
return tables;
}



举报

相关推荐

0 条评论