简介:轻量封装Ibatis3
因为本人在国内最大的电子商务公司工作期间,深感一个好的分表分库框架可以大大提高系统的承载能力及系统的灵活性,而一个不好的分表分库方案,则让系统在大数据量处理的时候非常郁闷。所以, 在根据笔者在从事电子商务开发的这几年中,对各个应用场景而开发的一个轻量封装Ibatis3的一个分表分库框架。
笔者工作的这几年之中,总结并开发了如下几个框架: summercool(Web 框架,已经应用于某国内大型网络公司的等重要应用)、summercool-hsf(基于Netty实现的RPC框架,已经应用国内某移动互联网公司)、 summercool-ddl(基于Mybaits的分表分库框架,已经应用国内某移动互联网公司);相继缓存方案、和消息系统解决方案也会慢慢开源。 Summercool框架做为笔者的第一个开源框架
summercool-hsf:http://summercool-hsf.googlecode.com/svn/trunk
1. 什么是HSF框架
HSF框架是一个高性能远程通信框架,底层基于Netty实现TCP通信,对上层进行封装,提供易于使用和高度可扩展能力。
名词解译:
1)Channel:可以理解为一个通道,或者连接
2)ChannelGroup:多个通道组合成为一个ChannelGroup
2.HSF工作流程
3.消息协议设计
消息协议这里是指对消息编码和解码的规范的一种定义,HSF内置的消息协议采用如下结构:
Length:以4个字节表示,是指ID + Content的长度。
ID:以1个字节表示,1表示Content部分被压缩,0表示未被压缩。
Content:真实的消息内容。
4.处理器
Netty框架原生提供一个处理器链对事件进行处理,每个处理器均实现ChannelHandler接口,ChannelHandler是个空接口,拥有三个子接口:ChannelDownstreamHandler, ChannelUpstreamHandler和LifeCycleAwareChannelHandler。这里我们主要关注前两个接口,因为它们被用来处理读与写的消息。
事件主要分为三种:ChannelEvent、MessageEvent和ExceptionEvent,一旦这些事件被触发,它们将从处理器链的一端到另一端,被逐个处理器处理,注意,整个过程是单线程场景。一般而言,ChannelEvent和ExceptionEvent事件都是从底层被触发,因此,它们会被ChannelUpstreamHandler处理。而MessageEvent则需要根据读与写方式的不同,分别从两个方向被ChannelUpstreamHandler和ChannelDownstreamHandler处理。
HSF内置的编(解)码处理器、压缩(解压)处理器及序列化(反序列化)处理器等都是直接或间接实现ChannelHandler。
♦ ChannelDownstreamHandler
Java代码
1. public interface ChannelDownstreamHandler extends
2. /**
3. * Handles the specified downstream event.
4. *
5. * @param ctx the context object for this handler
6. * @param e the downstream event to process or intercept
7. */
8. void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws
9. }
♦ ChannelUpstreamHandler
Java代码
1. public interface ChannelUpstreamHandler extends
2. /**
3. * Handles the specified upstream event.
4. *
5. * @param ctx the context object for this handler
6. * @param e the upstream event to process or intercept
7. */
8. void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws
9. }
4.1.Encoding流程
HSF内置的encoding过程由三个Handler组合完成,流程如下:
1) SerializeDownstreamHandler
Java代码
1. /**
2. * @Title: SerializeDownstreamHandler.java
3. * @Package com.gexin.hsf.netty.channelhandler.downstream
4. * @Description: 序列化
5. * @author
6. * @date 2011-9-16 下午4:45:59
7. * @version V1.0
8. */
9. public class SerializeDownstreamHandler implements
10. Logger logger = LoggerFactory.getLogger(getClass());
11. private Serializer serializer = new
12. public
13. }
14. public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws
15. if (!(e instanceof
16. ctx.sendDownstream(e);
17. return;
18. }
19. MessageEvent event = (MessageEvent) e;
20. Object originalMessage = event.getMessage();
21. Object encodedMessage = originalMessage;
22. if (!(originalMessage instanceof
23. encodedMessage = serializer.serialize(originalMessage);
24. else
25. encodedMessage = Heartbeat.BYTES;
26. }
27. if
28. ctx.sendDownstream(e);
29. else if (encodedMessage != null) {
30. write(ctx, e.getFuture(), encodedMessage, event.getRemoteAddress());
31. }
32. }
33. public void
34. this.serializer = serializer;
35. }
36. }
2)CompressionDownstreamHandler
Java代码
1. /**
2. * @Title: CompressionDownstreamHandler.java
3. * @Package com.gexin.hsf.netty.channelhandler.downstream
4. * @Description: 压缩处理器
5. * @author
6. * @date 2011-9-16 下午4:45:59
7. * @version V1.0
8. */
9. public class CompressionDownstreamHandler implements
10. private CompressionStrategy compressionStrategy = new
11. public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws
12. if (!(e instanceof
13. ctx.sendDownstream(e);
14. return;
15. }
16. MessageEvent event = (MessageEvent) e;
17. Object originalMessage = event.getMessage();
18. if (originalMessage instanceof byte[]) {
19. byte[]) originalMessage);
20. byte[] resBuffer = compressionResult.getBuffer();
21. int
22. byte[] bytes = new byte[length + 1];
23. 0] = compressionResult.isCompressed() ? (byte) 1 : (byte) 0;
24. for (int i = 0; i < length; i++) {
25. 1] = resBuffer[i];
26. }
27. new
28. event.getRemoteAddress());
29. ctx.sendDownstream(evt);
30. else
31. ctx.sendDownstream(e);
32. }
33. }
34. public void
35. this.compressionStrategy = compressionStrategy;
36. }
37. }
3)LengthBasedEncoder
Java代码
1. /**
2. * @ClassName: LengthBasedEncoder
3. * @Description: 基于长度的编码器
4. * @author
5. * @date 2011-9-29 下午1:43:41
6. *
7. */
8. public class LengthBasedEncoder extends
9. Logger logger = LoggerFactory.getLogger(getClass());
10. private final int
11. public
12. this(512);
13. }
14. public LengthBasedEncoder(int
15. if (estimatedLength < 0) {
16. throw new IllegalArgumentException("estimatedLength: "
17. }
18. this.estimatedLength = estimatedLength;
19. }
20. @Override
21. protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws
22. if (msg instanceof byte[]) {
23. byte[] bytes = (byte[]) msg;
24. ChannelBuffer ob = ChannelBuffers.dynamicBuffer(estimatedLength, channel.getConfig().getBufferFactory());
25. ob.writeInt(bytes.length);
26. ob.writeBytes(bytes);
27. return
28. else
29. throw new IllegalArgumentException("msg must be a byte[], but "
30. }
31. }
32. }
4.2.Decoding流程
decoding流程与encoding正好相反,流程如下:
1)LengthBasedDecoder
对于TCP通信而言,粘包是很正常的现象,因此decoder必须处理粘包问题。HsfFrameDecoder是一个支持粘包处理的decoder类抽象。
Java代码
1. /**
2. * @ClassName: LengthBasedDecoder
3. * @Description: 基于长度的解码器
4. * @author
5. * @date 2011-9-29 下午1:42:59
6. *
7. */
8. public class LengthBasedDecoder extends
9. private
10. private int headerFieldLength = 4;
11. public
12. this(4);
13. }
14. public LengthBasedDecoder(int
15. this.headerFieldLength = headerFieldLength;
16. }
17. @Override
18. protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws
19. if
20. buffer.markReaderIndex();
21. int
22. if (length < 0) {
23. "msg length must >= 0. but length={}", length);
24. return null;
25. else if (length == 0) {
26. return
27. else if
28. byte[] bytes = new byte[length];
29. buffer.readBytes(bytes);
30. return
31. else
32. buffer.resetReaderIndex();
33. }
34. }
35. return null;
36. }
37. }
2)DecompressionUpstreamHandler
Java代码
1. /**
2. * @Title: DecompressionUpstreamHandler.java
3. * @Package com.gexin.hsf.netty.channelhandler.downstream
4. * @Description: 解压缩处理器
5. * @author
6. * @date 2011-9-16 下午4:45:59
7. * @version V1.0
8. */
9. public class DecompressionUpstreamHandler extends
10. private CompressionStrategy compressionStrategy = new
11. @Override
12. public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws
13. if (e.getMessage() instanceof byte[]) {
14. byte[] bytes = (byte[]) e.getMessage();
15. int
16. if (length > 0) {
17. byte[] buffer = new byte[length - 1];
18. for (int i = 1; i < length; i++) {
19. 1] = bytes[i];
20. }
21. if (bytes[0] == 1) {
22. buffer = compressionStrategy.decompress(buffer);
23. }
24. new
25. super.messageReceived(ctx, event);
26. }
27. else
28. super.messageReceived(ctx, e);
29. }
30. }
31. public void
32. this.compressionStrategy = compressionStrategy;
33. }
34. }
3)DeserializeUpstreamHandler
Java代码
1. /**
2. * @Title: DeserializeUpstreamHandler.java
3. * @Package com.gexin.hsf.netty.channelhandler.downstream
4. * @Description: 反序列化
5. * @author
6. * @date 2011-9-16 下午4:45:59
7. * @version V1.0
8. */
9. public class DeserializeUpstreamHandler extends
10. private
11. private Serializer serializer = new
12. @Override
13. public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws
14. if (e.getMessage() == null) {
15. return;
16. else if (e.getMessage() instanceof byte[]) {
17. byte[] bytes = (byte[]) e.getMessage();
18. Object msg;
19. if (bytes.length == 0) {
20. msg = Heartbeat.getSingleton();
21. else
22. try
23. msg = serializer.deserialize(bytes);
24. catch
25. throw
26. }
27. }
28. new
29. super.messageReceived(ctx, event);
30. else
31. super.messageReceived(ctx, e);
32. }
33. }
34. public void
35. this.serializer = serializer;
36. }
37. }
4.3.处理器链的建立
HSF使用如下的方式构建处理器链:
Java代码
1. bootstrap.setPipelineFactory(new
2. public ChannelPipeline getPipeline() throws
3. ChannelPipeline pipeline = Channels.pipeline();
4. // 注册各种自定义Handler
5. for
6. pipeline.addLast(key, handlers.get(key));
7. }
8. // 注册链路空闲检测Handler
9. Integer writeIdleTime = LangUtil.parseInt(options.get(HsfOptions.WRITE_IDLE_TIME));
10. Integer readIdleTime = LangUtil.parseInt(options.get(HsfOptions.READ_IDLE_TIME));
11. if (writeIdleTime == null) {
12. 10;
13. }
14. if (readIdleTime == null) {
15. // 默认为写空闲的3倍
16. 3;
17. }
18. "timeout", new IdleStateHandler(idleTimer, readIdleTime, writeIdleTime, 0));
19. "idleHandler", new StateCheckChannelHandler(HsfAcceptorImpl.this));
20. // 注册事件分发Handler
21. "dispatchHandler", new
22. return
23. }
24. });
5.Dispatcher
消息经过Handler链处理后,将被Dispatcher转发,并进入EventListener链处理。
Dispatcher内置两个线程池:channelExecutor和msgExecutor。
channelExecutor用于处理通道事件和异常事件,考虑到在通道事件可能需要同步调用远程服务,因此此线程池不设上线(因为同步调用将会阻塞当前线程)。
msgExecutor用于处理消息事件,根据经验值,缺省最大线程数为150,该值可以通过Option参数修改。
6.EventListener
EventListener有以下三种:
1)ChannelEventListener
Java代码
1. /**
2. * @Title: ChannelEventListener.java
3. * @Package com.gexin.hsf.netty.listener
4. * @Description: 通道事件监听类
5. * @author
6. * @date 2011-9-27 上午11:45:50
7. * @version V1.0
8. */
9. public interface ChannelEventListener extends
10. /**
11. * Invoked when a {@link Channel} was closed and all its related resources were released.
12. *
13. * @author
14. * @param ctx
15. * @param channel
16. * @param e
17. * @return EventBehavior Whether to continue the events deliver
18. */
19. public
20. /**
21. * Invoked when a {@link Channel} is open, bound to a local address, and connected to a remote address.
22. *
23. * @author
24. * @param ctx
25. * @param channel
26. * @param e
27. * @return EventBehavior Whether to continue the events deliver
28. */
29. public
30. /**
31. * Invoked when a group is created.
32. *
33. * @author
34. * @param ctx
35. * @param channel
36. * @param groupName
37. * @return EventBehavior Whether to continue the events deliver
38. */
39. public
40. /**
41. * Invoked when a group is removed.
42. *
43. * @author
44. * @param ctx
45. * @param channel
46. * @param groupName
47. * @return EventBehavior Whether to continue the events deliver
48. */
49. public
50. }
2)MessageEventListener
Java代码
1. /**
2. * @Title: MessageListener.java
3. * @Package com.gexin.hsf.netty.listener
4. * @Description: 消息监听接口
5. * @author
6. * @date 2011-9-27 上午11:36:22
7. * @version V1.0
8. */
9. public interface MessageEventListener extends
10. /**
11. * Invoked when a message object (e.g: {@link ChannelBuffer}) was received
12. * from a remote peer.
13. */
14. public
15. }
3)ExceptionEventListener
Java代码
1. /**
2. * @Title: ExceptionEventListener.java
3. * @Package com.gexin.hsf.netty.listener
4. * @Description: 异常监听接口
5. * @author
6. * @date 2011-9-27 上午11:48:09
7. * @version V1.0
8. */
9. public interface ExceptionEventListener extends
10. /**
11. * Invoked when an exception was raised by an I/O thread or a {@link ChannelHandler}.
12. */
13. public
14. }
Hsf框架会预先在EventListener链末端注册ServiceMessageEventListener,该Listener负责调用被注册的Service,并将返回值或异常回传。
7.Service
1)RemoteServiceContract注解
所有实现了拥有RemoteServiceContract注解的Java类都可以直接注册到HsfService,示例如下:
Java代码
1. @RemoteServiceContract
2. public interface
3. String test(String ctx);
4. }
5.
6. public class TestServiceImpl implements
7. @Override
8. public
9. return String.valueOf("hello "
10. }
11. }
2)ServiceEntry
对于未添加RemoteServiceContract注解的接口,Hsf框架使用org.summercool.hsf.pojo.ServiceEntry类实现注册。
3)注册Service
服务提供方需要向Hsf注册Service方可被远程调用,示例如下:
♦ 注册Service
Java代码
1. HsfAcceptor acceptor = new
2. // 注册Service
3. acceptor.registerService(new
4. // 监听端口
5. acceptor.bind(8082);
♦ 远程调用Service
Java代码
1. HsfConnector connector = new
2. connector.connect(new InetSocketAddress("127.0.0.1",8082));
3. // 同步方式
4. TestService testService = ServiceProxyFactory.getRoundFactoryInstance(connector).wrapSyncProxy(TestService.class);
5. System.out.println(testService.test("HSF"));
3)同步与异步
4)原理
7.Handshake
当通道建立后,Client和Server会进行三次握手,以完成初始化
初次握手步骤
1)Client与Server建立连接成功
2)Client向Server发送握手请求包(handshake request)
3)Server接收到握手请求包后,生成group信息,然后触发groupCreated事件,接着向client发送握手反馈包(handshake ack)
4)Client接收到握手反馈包后,生成group信息,然后触发groupCreated事件,接着向server发送握手完成包(handshake finish)
非初次握手步骤
1)Client与Server建立连接成功
2)Client向Server发送握手请求包(handshake request)
3)Server接收到握手请求包后,添加该连接到Group,接着向client发送握手反馈包(handshake ack)
4)Client接收到握手反馈包后,添加该连接到Group,接着向server发送握手完成包(handshake finish)
以上三次握手所发送的包都只包含本身的group信息,但Hsf对外提供了握手的扩展接口,应用可以使用该接口结合自身的业务,以完成连接建立后的初始化工作。
Client握手扩展接口
8.Heartbeat、超时及重连机制
Heartbeat和超时机制依赖于Netty的读空闲和写空闲回调。
当发生写空闲时,会向对方发送Heartbeat消息,写空闲时间可以通过参数HsfOptions.WRITE_IDLE_TIME设定,缺省为10秒。
当发生读空闲时,即判定为超时,主动关闭连接,读空闲时间可以通过参数HsfOptions.READ_IDLE_TIME设定,缺省为60秒。
对于断开的连接,Hsf会为其重连,重连频率通过HsfOptions.RECONNECT_INTERVAL参数设定,缺省为10000毫秒。
9.Option参数
Hsf支持以参数配置:
参数名 说明 缺省值
HsfOptions.TCP_NO_DELAY | TCP参数,是否关闭延迟发送消息包 | true |
HsfOptions.KEEP_ALIVE | TCP参数,是否保持连接 | true |
HsfOptions.REUSE_ADDRESS | TCP参数,是否重用端口 | false |
HsfOptions.WRITE_IDLE_TIME | 写空闲时间(秒) | 10 |
HsfOptions.READ_IDLE_TIME | 读空闲时间(秒) | 60 |
HsfOptions.SYNC_INVOKE_TIMEOUT | 同步调用超时时间(毫秒) | 60000 |
HsfOptions.HANDSHAKE_TIMEOUT | 握手超时时间(毫秒) | 15000 |
HsfOptions.FLOW_LIMIT | 流量限额 | 2000000 |
HsfOptions.TIMEOUT_WHEN_FLOW_EXCEEDED | 申请流量超时时间(毫秒) | 3000 |
HsfOptions.MAX_THREAD_NUM_OF_DISPATCHER | 分发器的最大线程数 | 150 |
HsfOptions.CHANNEL_NUM_PER_GROUP | 每个Group建立的通道数 | Runtime.getRuntime().availableProcessors() |
HsfOptions.RECONNECT_INTERVAL | 重连频率(毫秒) | 10000 |
HsfOptions.CONNECT_TIMEOUT | 建立连接超时时间(毫秒) | 30000 |
HsfOptions.HOLD_CALLBACK_MESSAGE | 是否缓存Callback方式发送的消息,缓存后将会在发送失败时回调doException方法参数传入 | false |
这些参数可以通过如下方式调整:
Java代码
1. HsfConnector connector = new
2. connector.setOption(HsfOptions.CHANNEL_NUM_PER_GROUP, 1);