0
点赞
收藏
分享

微信扫一扫

纯Java的高性能长连接RPC解决方案

林塬 2023-07-26 阅读 52


简介:轻量封装Ibatis3

         因为本人在国内最大的电子商务公司工作期间,深感一个好的分表分库框架可以大大提高系统的承载能力及系统的灵活性,而一个不好的分表分库方案,则让系统在大数据量处理的时候非常郁闷。所以, 在根据笔者在从事电子商务开发的这几年中,对各个应用场景而开发的一个轻量封装Ibatis3的一个分表分库框架。

         笔者工作的这几年之中,总结并开发了如下几个框架: summercool(Web 框架,已经应用于某国内大型网络公司的等重要应用)、summercool-hsf(基于Netty实现的RPC框架,已经应用国内某移动互联网公司)、 summercool-ddl(基于Mybaits的分表分库框架,已经应用国内某移动互联网公司);相继缓存方案、和消息系统解决方案也会慢慢开源。 Summercool框架做为笔者的第一个开源框架

summercool-hsf:http://summercool-hsf.googlecode.com/svn/trunk

 

 

1. 什么是HSF框架

    HSF框架是一个高性能远程通信框架,底层基于Netty实现TCP通信,对上层进行封装,提供易于使用和高度可扩展能力。

 

 

名词解译:

    1)Channel:可以理解为一个通道,或者连接

    2)ChannelGroup:多个通道组合成为一个ChannelGroup

 

2.HSF工作流程

 

3.消息协议设计

    消息协议这里是指对消息编码和解码的规范的一种定义,HSF内置的消息协议采用如下结构:

 

 

 

    Length:以4个字节表示,是指ID + Content的长度。

    ID:以1个字节表示,1表示Content部分被压缩,0表示未被压缩。

    Content:真实的消息内容。

 

 

4.处理器

    Netty框架原生提供一个处理器链对事件进行处理,每个处理器均实现ChannelHandler接口,ChannelHandler是个空接口,拥有三个子接口:ChannelDownstreamHandler, ChannelUpstreamHandler和LifeCycleAwareChannelHandler。这里我们主要关注前两个接口,因为它们被用来处理读与写的消息。
    事件主要分为三种:ChannelEvent、MessageEvent和ExceptionEvent,一旦这些事件被触发,它们将从处理器链的一端到另一端,被逐个处理器处理,注意,整个过程是单线程场景。一般而言,ChannelEvent和ExceptionEvent事件都是从底层被触发,因此,它们会被ChannelUpstreamHandler处理。而MessageEvent则需要根据读与写方式的不同,分别从两个方向被ChannelUpstreamHandler和ChannelDownstreamHandler处理。
    HSF内置的编(解)码处理器、压缩(解压)处理器及序列化(反序列化)处理器等都是直接或间接实现ChannelHandler。

    ♦ ChannelDownstreamHandler

 


Java代码  

1. public interface ChannelDownstreamHandler extends
2. /**
3.      * Handles the specified downstream event.
4.      *
5.      * @param ctx  the context object for this handler
6.      * @param e    the downstream event to process or intercept
7.      */
8. void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws
9. }


 

    ♦ ChannelUpstreamHandler

 


Java代码  

1. public interface ChannelUpstreamHandler extends
2. /**
3.      * Handles the specified upstream event.
4.      *
5.      * @param ctx  the context object for this handler
6.      * @param e    the upstream event to process or intercept
7.      */
8. void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws
9. }



 

 

4.1.Encoding流程

     HSF内置的encoding过程由三个Handler组合完成,流程如下:

 

 

 

    1) SerializeDownstreamHandler

 


Java代码  

1. /**
2.  * @Title: SerializeDownstreamHandler.java
3.  * @Package com.gexin.hsf.netty.channelhandler.downstream
4.  * @Description: 序列化
5.  * @author 
6.  * @date 2011-9-16 下午4:45:59
7.  * @version V1.0
8.  */
9. public class SerializeDownstreamHandler implements
10.     Logger logger = LoggerFactory.getLogger(getClass());  
11. private Serializer serializer = new
12. public
13.     }  
14. public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws
15. if (!(e instanceof
16.             ctx.sendDownstream(e);  
17. return;  
18.         }  
19.         MessageEvent event = (MessageEvent) e;  
20.         Object originalMessage = event.getMessage();  
21.         Object encodedMessage = originalMessage;  
22. if (!(originalMessage instanceof
23.             encodedMessage = serializer.serialize(originalMessage);  
24. else
25.             encodedMessage = Heartbeat.BYTES;  
26.         }  
27. if
28.             ctx.sendDownstream(e);  
29. else if (encodedMessage != null) {  
30.             write(ctx, e.getFuture(), encodedMessage, event.getRemoteAddress());  
31.         }  
32.     }  
33. public void
34. this.serializer = serializer;  
35.     }  
36. }


      2)CompressionDownstreamHandler

 

 


Java代码  

1. /**
2.  * @Title: CompressionDownstreamHandler.java
3.  * @Package com.gexin.hsf.netty.channelhandler.downstream
4.  * @Description: 压缩处理器
5.  * @author 
6.  * @date 2011-9-16 下午4:45:59
7.  * @version V1.0
8.  */
9. public class CompressionDownstreamHandler implements
10. private CompressionStrategy compressionStrategy = new
11. public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws
12. if (!(e instanceof
13.             ctx.sendDownstream(e);  
14. return;  
15.         }  
16.         MessageEvent event = (MessageEvent) e;  
17.         Object originalMessage = event.getMessage();  
18. if (originalMessage instanceof byte[]) {  
19. byte[]) originalMessage);  
20. byte[] resBuffer = compressionResult.getBuffer();  
21. int
22. byte[] bytes = new byte[length + 1];  
23. 0] = compressionResult.isCompressed() ? (byte) 1 : (byte) 0;  
24. for (int i = 0; i < length; i++) {  
25. 1] = resBuffer[i];  
26.             }  
27. new
28.                     event.getRemoteAddress());  
29.             ctx.sendDownstream(evt);  
30. else
31.             ctx.sendDownstream(e);  
32.         }  
33.     }  
34. public void
35. this.compressionStrategy = compressionStrategy;  
36.     }  
37. }



       3)LengthBasedEncoder

 

 


Java代码  

1. /**
2.  * @ClassName: LengthBasedEncoder
3.  * @Description: 基于长度的编码器
4.  * @author 
5.  * @date 2011-9-29 下午1:43:41
6.  * 
7.  */
8. public class LengthBasedEncoder extends
9.     Logger logger = LoggerFactory.getLogger(getClass());  
10. private final int
11. public
12. this(512);  
13.     }  
14. public LengthBasedEncoder(int
15. if (estimatedLength < 0) {  
16. throw new IllegalArgumentException("estimatedLength: "
17.         }  
18. this.estimatedLength = estimatedLength;  
19.     }  
20. @Override
21. protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws
22. if (msg instanceof byte[]) {  
23. byte[] bytes = (byte[]) msg;  
24.             ChannelBuffer ob = ChannelBuffers.dynamicBuffer(estimatedLength, channel.getConfig().getBufferFactory());  
25.             ob.writeInt(bytes.length);  
26.             ob.writeBytes(bytes);  
27. return
28. else
29. throw new IllegalArgumentException("msg must be a byte[], but "
30.         }  
31.     }  
32. }

 

   4.2.Decoding流程

    decoding流程与encoding正好相反,流程如下:

 

    1)LengthBasedDecoder

    对于TCP通信而言,粘包是很正常的现象,因此decoder必须处理粘包问题。HsfFrameDecoder是一个支持粘包处理的decoder类抽象。


Java代码  

1. /**
2.  * @ClassName: LengthBasedDecoder
3.  * @Description: 基于长度的解码器
4.  * @author 
5.  * @date 2011-9-29 下午1:42:59
6.  * 
7.  */
8. public class LengthBasedDecoder extends
9. private
10. private int headerFieldLength = 4;  
11. public
12. this(4);  
13.     }  
14. public LengthBasedDecoder(int
15. this.headerFieldLength = headerFieldLength;  
16.     }  
17. @Override
18. protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws
19. if
20.             buffer.markReaderIndex();  
21. int
22. if (length < 0) {  
23. "msg length must >= 0. but length={}", length);  
24. return null;  
25. else if (length == 0) {  
26. return
27. else if
28. byte[] bytes = new byte[length];  
29.                 buffer.readBytes(bytes);  
30. return
31. else
32.                 buffer.resetReaderIndex();  
33.             }  
34.         }  
35. return null;  
36.     }  
37. }



     2)DecompressionUpstreamHandler


Java代码  


    1. /**
    2.  * @Title: DecompressionUpstreamHandler.java
    3.  * @Package com.gexin.hsf.netty.channelhandler.downstream
    4.  * @Description: 解压缩处理器
    5.  * @author 
    6.  * @date 2011-9-16 下午4:45:59
    7.  * @version V1.0
    8.  */
    9. public class DecompressionUpstreamHandler extends
    10. private CompressionStrategy compressionStrategy = new
    11. @Override
    12. public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws
    13. if (e.getMessage() instanceof byte[]) {  
    14. byte[] bytes = (byte[]) e.getMessage();  
    15. int
    16. if (length > 0) {  
    17. byte[] buffer = new byte[length - 1];  
    18. for (int i = 1; i < length; i++) {  
    19. 1] = bytes[i];  
    20.                 }  
    21. if (bytes[0] == 1) {  
    22.                     buffer = compressionStrategy.decompress(buffer);  
    23.                 }  
    24. new
    25. super.messageReceived(ctx, event);  
    26.             }  
    27. else
    28. super.messageReceived(ctx, e);  
    29.         }  
    30.     }  
    31. public void
    32. this.compressionStrategy = compressionStrategy;  
    33.     }  
    34. }



         3)DeserializeUpstreamHandler


    Java代码  


      1. /**
      2.  * @Title: DeserializeUpstreamHandler.java
      3.  * @Package com.gexin.hsf.netty.channelhandler.downstream
      4.  * @Description: 反序列化
      5.  * @author 
      6.  * @date 2011-9-16 下午4:45:59
      7.  * @version V1.0
      8.  */
      9. public class DeserializeUpstreamHandler extends
      10. private
      11. private Serializer serializer = new
      12. @Override
      13. public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws
      14. if (e.getMessage() == null) {  
      15. return;  
      16. else if (e.getMessage() instanceof byte[]) {  
      17. byte[] bytes = (byte[]) e.getMessage();  
      18.             Object msg;  
      19. if (bytes.length == 0) {  
      20.                 msg = Heartbeat.getSingleton();  
      21. else
      22. try
      23.                     msg = serializer.deserialize(bytes);  
      24. catch
      25. throw
      26.                 }  
      27.             }  
      28. new
      29. super.messageReceived(ctx, event);  
      30. else
      31. super.messageReceived(ctx, e);  
      32.         }  
      33.     }  
      34. public void
      35. this.serializer = serializer;  
      36.     }  
      37. }



          4.3.处理器链的建立

          HSF使用如下的方式构建处理器链:


      Java代码  



        1. bootstrap.setPipelineFactory(new
        2. public ChannelPipeline getPipeline() throws
        3.                 ChannelPipeline pipeline = Channels.pipeline();  
        4. // 注册各种自定义Handler
        5. for
        6.                     pipeline.addLast(key, handlers.get(key));  
        7.                 }  
        8. // 注册链路空闲检测Handler
        9.                 Integer writeIdleTime = LangUtil.parseInt(options.get(HsfOptions.WRITE_IDLE_TIME));  
        10.                 Integer readIdleTime = LangUtil.parseInt(options.get(HsfOptions.READ_IDLE_TIME));  
        11. if (writeIdleTime == null) {  
        12. 10;  
        13.                 }  
        14. if (readIdleTime == null) {  
        15. // 默认为写空闲的3倍
        16. 3;  
        17.                 }  
        18. "timeout", new IdleStateHandler(idleTimer, readIdleTime, writeIdleTime, 0));  
        19. "idleHandler", new StateCheckChannelHandler(HsfAcceptorImpl.this));  
        20. // 注册事件分发Handler
        21. "dispatchHandler", new
        22. return
        23.             }  
        24.         });



        5.Dispatcher

            消息经过Handler链处理后,将被Dispatcher转发,并进入EventListener链处理。

            Dispatcher内置两个线程池:channelExecutor和msgExecutor。

            channelExecutor用于处理通道事件和异常事件,考虑到在通道事件可能需要同步调用远程服务,因此此线程池不设上线(因为同步调用将会阻塞当前线程)。

            msgExecutor用于处理消息事件,根据经验值,缺省最大线程数为150,该值可以通过Option参数修改。

        6.EventListener

            EventListener有以下三种:

            1)ChannelEventListener


        Java代码  


          1. /**
          2.  * @Title: ChannelEventListener.java
          3.  * @Package com.gexin.hsf.netty.listener
          4.  * @Description: 通道事件监听类
          5.  * @author 
          6.  * @date 2011-9-27 上午11:45:50
          7.  * @version V1.0
          8.  */
          9. public interface ChannelEventListener extends
          10. /**
          11.      * Invoked when a {@link Channel} was closed and all its related resources were released.
          12.      * 
          13.      * @author
          14.      * @param ctx
          15.      * @param channel
          16.      * @param e
          17.      * @return EventBehavior Whether to continue the events deliver
          18.      */
          19. public
          20. /**
          21.      * Invoked when a {@link Channel} is open, bound to a local address, and connected to a remote address.
          22.      * 
          23.      * @author
          24.      * @param ctx
          25.      * @param channel
          26.      * @param e
          27.      * @return EventBehavior Whether to continue the events deliver
          28.      */
          29. public
          30. /**
          31.      * Invoked when a group is created.
          32.      * 
          33.      * @author
          34.      * @param ctx
          35.      * @param channel
          36.      * @param groupName
          37.      * @return EventBehavior Whether to continue the events deliver
          38.      */
          39. public
          40. /**
          41.      * Invoked when a group is removed.
          42.      * 
          43.      * @author
          44.      * @param ctx
          45.      * @param channel
          46.      * @param groupName
          47.      * @return EventBehavior Whether to continue the events deliver
          48.      */
          49. public
          50. }



               2)MessageEventListener


          Java代码  


          1. /**
          2.  * @Title: MessageListener.java
          3.  * @Package com.gexin.hsf.netty.listener
          4.  * @Description: 消息监听接口
          5.  * @author
          6.  * @date 2011-9-27 上午11:36:22
          7.  * @version V1.0
          8.  */
          9. public interface MessageEventListener extends
          10. /**
          11.      * Invoked when a message object (e.g: {@link ChannelBuffer}) was received
          12.      * from a remote peer.
          13.      */
          14. public
          15. }



               3)ExceptionEventListener


          Java代码  


            1. /**
            2.  * @Title: ExceptionEventListener.java
            3.  * @Package com.gexin.hsf.netty.listener
            4.  * @Description: 异常监听接口
            5.  * @author
            6.  * @date 2011-9-27 上午11:48:09
            7.  * @version V1.0
            8.  */
            9. public interface ExceptionEventListener extends
            10. /**
            11.      * Invoked when an exception was raised by an I/O thread or a {@link ChannelHandler}.
            12.      */
            13. public
            14. }



                 Hsf框架会预先在EventListener链末端注册ServiceMessageEventListener,该Listener负责调用被注册的Service,并将返回值或异常回传。

                7.Service

                1)RemoteServiceContract注解

                所有实现了拥有RemoteServiceContract注解的Java类都可以直接注册到HsfService,示例如下:


            Java代码  


            1. @RemoteServiceContract
            2. public interface
            3.     String test(String ctx);  
            4. }  
            5.    
            6. public class TestServiceImpl implements
            7. @Override
            8. public
            9. return String.valueOf("hello "
            10.     }  
            11. }


                2)ServiceEntry

                对于未添加RemoteServiceContract注解的接口,Hsf框架使用org.summercool.hsf.pojo.ServiceEntry类实现注册。

               

                3)注册Service

                服务提供方需要向Hsf注册Service方可被远程调用,示例如下:

                ♦ 注册Service


            Java代码  

            1. HsfAcceptor acceptor = new
            2. // 注册Service
            3. acceptor.registerService(new
            4. // 监听端口
            5. acceptor.bind(8082);


                 ♦ 远程调用Service


            Java代码  

            1. HsfConnector connector = new
            2. connector.connect(new InetSocketAddress("127.0.0.1",8082));  
            3. // 同步方式
            4. TestService testService = ServiceProxyFactory.getRoundFactoryInstance(connector).wrapSyncProxy(TestService.class);  
            5. System.out.println(testService.test("HSF"));

                

                3)同步与异步

                4)原理

               7.Handshake

                当通道建立后,Client和Server会进行三次握手,以完成初始化

                初次握手步骤

                1)Client与Server建立连接成功

                2)Client向Server发送握手请求包(handshake request)

                3)Server接收到握手请求包后,生成group信息,然后触发groupCreated事件,接着向client发送握手反馈包(handshake ack)

                4)Client接收到握手反馈包后,生成group信息,然后触发groupCreated事件,接着向server发送握手完成包(handshake finish)

                非初次握手步骤

                1)Client与Server建立连接成功

                2)Client向Server发送握手请求包(handshake request)

                3)Server接收到握手请求包后,添加该连接到Group,接着向client发送握手反馈包(handshake ack)

                4)Client接收到握手反馈包后,添加该连接到Group,接着向server发送握手完成包(handshake finish)

                以上三次握手所发送的包都只包含本身的group信息,但Hsf对外提供了握手的扩展接口,应用可以使用该接口结合自身的业务,以完成连接建立后的初始化工作。

                Client握手扩展接口

               8.Heartbeat、超时及重连机制

                Heartbeat和超时机制依赖于Netty的读空闲和写空闲回调。

                当发生写空闲时,会向对方发送Heartbeat消息,写空闲时间可以通过参数HsfOptions.WRITE_IDLE_TIME设定,缺省为10秒。

                当发生读空闲时,即判定为超时,主动关闭连接,读空闲时间可以通过参数HsfOptions.READ_IDLE_TIME设定,缺省为60秒。

                对于断开的连接,Hsf会为其重连,重连频率通过HsfOptions.RECONNECT_INTERVAL参数设定,缺省为10000毫秒。

               9.Option参数

                Hsf支持以参数配置:


            参数名 说明 缺省值


            HsfOptions.TCP_NO_DELAY

            TCP参数,是否关闭延迟发送消息包

            true

            HsfOptions.KEEP_ALIVE

            TCP参数,是否保持连接

            true

            HsfOptions.REUSE_ADDRESS

            TCP参数,是否重用端口

            false

            HsfOptions.WRITE_IDLE_TIME

            写空闲时间(秒)

            10

            HsfOptions.READ_IDLE_TIME

            读空闲时间(秒)

            60

            HsfOptions.SYNC_INVOKE_TIMEOUT

            同步调用超时时间(毫秒)

            60000

            HsfOptions.HANDSHAKE_TIMEOUT

            握手超时时间(毫秒)

            15000

            HsfOptions.FLOW_LIMIT

            流量限额

            2000000

            HsfOptions.TIMEOUT_WHEN_FLOW_EXCEEDED

            申请流量超时时间(毫秒)

            3000

            HsfOptions.MAX_THREAD_NUM_OF_DISPATCHER

            分发器的最大线程数

            150

            HsfOptions.CHANNEL_NUM_PER_GROUP

            每个Group建立的通道数

            Runtime.getRuntime().availableProcessors()

            HsfOptions.RECONNECT_INTERVAL

            重连频率(毫秒)

            10000

            HsfOptions.CONNECT_TIMEOUT

            建立连接超时时间(毫秒)

            30000

            HsfOptions.HOLD_CALLBACK_MESSAGE

            是否缓存Callback方式发送的消息,缓存后将会在发送失败时回调doException方法参数传入

            false



                这些参数可以通过如下方式调整:


            Java代码  


            1. HsfConnector connector = new
            2. connector.setOption(HsfOptions.CHANNEL_NUM_PER_GROUP, 1);


            举报

            相关推荐

            0 条评论