0
点赞
收藏
分享

微信扫一扫

netty的个人使用心得


Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。



如果需要客户端和服务器端沟通 分别都需要编写一个 实现了SimpleChannelHandler接口的类,其中类中需要重写的主要方法为

channelConnected() and channelOpen() 这两个方法为 当客户端链接到服务器端得时候和 客户端 channel被创建出来的时候所调用的



channelDisconnected and channelClosed() 对应上面的两个方法



exceptionCaught 可以获得 对应handler端(服务器或客户端)的异常信息



messageReceived 每个 客户端 发送的信息后 将调用此方法



当编写完某端得程序后(客户端或服务器端) 将编写好的handler需要配置在 实现了ChannelPipelineFactory的类里,ChannelPipelineFactory中有一个需要实现的方法getPipeline将写好的handler配置到其中,在这个 工厂里 可能要添加很多东西

比如说 编解码器,心跳等。。。。



如需要自定义编解码器需要继承:LengthFieldBasedFrameDecoder(解码),OneToOneEncoder(编码)



编解码器(encode,decode)

encode为 调用messageReceived 方法之后调用的方法,则decode方法为 messageReceived 之前调用的方法 ,用于处理自定义包协议的解析于编辑



心跳: 当客户端socket在非正常情况家掉线,如: 断网,断电等特殊问题的时候, 客户端的channel对象不会自动关闭,需要一直接收到客户端的消息,从而判断是否可以和对象构成通信。。 如果 发现客户端空闲时间过长则视为掉线







服务端handler代码如下


package com.djyou.server; 


import java.util.logging.Logger; 


import org.jboss.netty.buffer.ChannelBuffer; 

import org.jboss.netty.channel.ChannelHandlerContext; 

import org.jboss.netty.channel.ChannelStateEvent; 

import org.jboss.netty.channel.ChildChannelStateEvent; 

import org.jboss.netty.channel.ExceptionEvent; 

import org.jboss.netty.channel.MessageEvent; 

import org.jboss.netty.channel.SimpleChannelHandler; 

import org.jboss.netty.channel.group.ChannelGroup; 

import org.jboss.netty.channel.group.DefaultChannelGroup; 



public class ChatServerHandler extends SimpleChannelHandler{ 


 public static final ChannelGroup channelGroup = new DefaultChannelGroup(); 


 public int id; 


 @Override 

 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) 

 throws Exception { 

 System.out.println("进来一个"); 

 } 


 @Override 

 public void channelDisconnected(ChannelHandlerContext ctx, 

 ChannelStateEvent e) throws Exception { 

 super.channelDisconnected(ctx, e); 

 } 


 @Override 

 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) 

 throws Exception { 

 Logger.getAnonymousLogger().info(e.getCause().getMessage()); 

 ctx.getChannel().close(); 

 // TODO Auto-generated method stub 

 //super.exceptionCaught(ctx, e); 

 } 


 @Override 

 public void childChannelClosed(ChannelHandlerContext ctx, 

 ChildChannelStateEvent e) throws Exception { 


 super.childChannelClosed(ctx, e); 

 } 


 @Override 

 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) 

 throws Exception { 

 System.out.println(this.id++); 


 //google protocol解码后返回为 ChannelBuffer类型 

 if(!(e.getMessage() instanceof ChannelBuffer)) return; 

 //获得 消息对象 

 ChannelBuffer channelBuffer = (ChannelBuffer)e.getMessage(); 


 //MessageInfo info = Message.MessageInfo.newBuilder().mergeFrom(channelBuffer.copy().array()).build(); 

 //写回给客户端 

 e.getChannel().write(channelBuffer); 



 } 



}




pieplelineFactory里的代码为


package com.djyou.server; 

import static org.jboss.netty.channel.Channels.*; 

import org.jboss.netty.channel.ChannelPipeline; 

import org.jboss.netty.channel.ChannelPipelineFactory; 

import org.jboss.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 

import org.jboss.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; 

import org.jboss.netty.handler.timeout.IdleStateHandler; 

import org.jboss.netty.util.Timer; 


public class ChatPipelineServerFactory implements ChannelPipelineFactory{ 


 private Timer timer; 


 public ChatPipelineServerFactory(Timer timer){ 


 this.timer = timer; 

 } 


 @Override 

 public ChannelPipeline getPipeline() throws Exception { 


 ChannelPipeline pipeline = pipeline(); 


 //添加netty默认支持的 编解码器(可自动添加包头,并处理粘包问题) 


pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());//对应 

 pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());//此对象为 netty默认支持protocolbuf的编解码器 

 pipeline.addLast("timeout", new IdleStateHandler(timer, 10, 10, 0));//此两项为添加心跳机制 10秒查看一次在线的客户端channel是否空闲,IdleStateHandler为netty jar包中提供的类 

 pipeline.addLast("hearbeat", new Heartbeat());//此类 实现了IdleStateAwareChannelHandler接口 


 //netty会定时扫描 空闲的channel 

 //pipeline.addLast("frameDecoder", new ProtobufDecoder(Message.MessageInfo.getDefaultInstance())); 

 //pipeline.addLast("frameEncoder", new ProtobufEncoder());// 

 pipeline.addLast("handler", new ChatServerHandler());//将编写好的服务器端的handler添加到这里 



 return pipeline; 

 } 


}




心跳包的代码如下


import org.jboss.netty.channel.ChannelHandlerContext; 

import org.jboss.netty.handler.timeout.IdleState; 

import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler; 

import org.jboss.netty.handler.timeout.IdleStateEvent; 

public class Heartbeat extends IdleStateAwareChannelHandler{ 


 int i = 0; 


 @Override 

 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) 

 throws Exception { 

 // TODO Auto-generated method stub 

 super.channelIdle(ctx, e); 


 if(e.getState() == IdleState.WRITER_IDLE) 

 i++; 


 if(i==3){ 

 e.getChannel().close(); 


 System.out.println("掉了。"); 

 } 

 } 



}



自定义解码器代码



package com.djyou.server; 


import org.jboss.netty.buffer.ChannelBuffer; 

import org.jboss.netty.channel.Channel; 

import org.jboss.netty.channel.ChannelHandlerContext; 

import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder; 


public class Decode extends LengthFieldBasedFrameDecoder{ 


 public Decode(int maxFrameLength, int lengthFieldOffset, 

 int lengthFieldLength) { 

 super(maxFrameLength, lengthFieldOffset, lengthFieldLength); 

 // TODO Auto-generated constructor stub 

 } 


 @Override 

 protected Object decode(ChannelHandlerContext ctx, Channel channel, 

 ChannelBuffer buffer) throws Exception { 


 ChannelBuffer buffs = (ChannelBuffer)super.decode(ctx, channel, buffer); 



 return buffs; 

 } 


}





自定义编码器代码



package com.djyou.server; 


import org.jboss.netty.channel.Channel; 

import org.jboss.netty.channel.ChannelHandlerContext; 

import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; 


public class Encode extends OneToOneEncoder{ 


 @Override 

 protected Object encode(ChannelHandlerContext ctx, Channel channel, 

 Object msg) throws Exception { 

 // TODO Auto-generated method stub 

 return null; 

 } 


}


服务端启动代码





package com.djyou.server; 


import java.net.InetSocketAddress; 

import java.util.concurrent.Executors; 


import org.jboss.netty.bootstrap.ServerBootstrap; 

import org.jboss.netty.channel.ChannelFactory; 

import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; 

import org.jboss.netty.util.HashedWheelTimer; 

import org.jboss.netty.util.Timer; 


public class ChatServer { 

 public static void main(String[] args) { 

 ChannelFactory factory = new NioServerSocketChannelFactory(Executors 

 .newCachedThreadPool(), Executors.newCachedThreadPool(), 

 Runtime.getRuntime().availableProcessors() + 1); 


 Timer timer = new HashedWheelTimer(); 


 ServerBootstrap bootstrap = new ServerBootstrap(factory); 

 bootstrap.setPipelineFactory(new ChatPipelineServerFactory(timer)); 

 bootstrap.setOption("child.tcpNoDelay", true); 

 bootstrap.setOption("child.keepAlive", true); 

 bootstrap.setOption("reuseAddress", true); 


 bootstrap.bind(new InetSocketAddress(6666)); 

 } 

}


客户端启动代码如下(除客户端启动代码意外 其余的东西都与服务器端一样 都需要编写对应的 编解码器,定时发送消息线程(10秒发个信息给服务端 确保channel不为空闲, 来对应心跳程序), 客户端的handler)


package com.djyou.client; 


import java.io.BufferedReader; 

import java.io.InputStreamReader; 

import java.net.InetSocketAddress; 

import java.util.concurrent.Executors; 


import org.jboss.netty.bootstrap.ClientBootstrap; 

import org.jboss.netty.buffer.ChannelBuffer; 

import org.jboss.netty.buffer.ChannelBuffers; 

import org.jboss.netty.channel.Channel; 

import org.jboss.netty.channel.ChannelFactory; 

import org.jboss.netty.channel.ChannelFuture; 

import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; 


import com.djyou.protoBufModel.Message; 

import com.djyou.protoBufModel.Message.MessageInfo; 


public class ChatClient { 


 public static void main(String[] args) throws Exception{ 

 String host = "localhost"; 

 int port = 6666; 


 // Configure the client. 

 ChannelFactory factory = 

 new NioClientSocketChannelFactory( 

 Executors.newCachedThreadPool(), 

 Executors.newCachedThreadPool()); 


 ClientBootstrap bootstrap = new ClientBootstrap(factory); 

 ChatPipelineClientFactory cpcf = new ChatPipelineClientFactory(); 

 bootstrap.setPipelineFactory(cpcf); 


 // Start the connection attempt. 

 ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); 


 // Wait until the connection attempt succeeds or fails. 

 Channel channel = future.awaitUninterruptibly().getChannel(); 

 if (!future.isSuccess()) { 

 future.getCause().printStackTrace(); 

 System.exit(0); 

 } 


 // Read commands from the stdin. 

 ChannelFuture lastWriteFuture = null; 

 BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); 

// for (;;) { 

// String line = in.readLine(); 

// if (line == null) { 

// break; 

// } 


 //创建Builder 

 MessageInfo.Builder builder = MessageInfo.newBuilder(); 


 builder.addBody(Message.Body.newBuilder().setKey("message").setValue("你在干什么?" + "/n").build()); 


 //创建 赋值结束的 Build 并生成 MessageInfo对象 

 MessageInfo messageInfo = builder.build(); 

 //将messageInfo转换为字节 

 byte[] messageByte = messageInfo.toByteArray(); 


 //获得此对象的长度 

 ChannelBuffer channelBuffer = ChannelBuffers.buffer(messageByte.length); 

 //将 获得到的数组写入 channelBuffer中 

 channelBuffer.writeBytes(messageByte); 

 //发送到服务器端 

 for(int i = 0; i < 10;i++){ 

 lastWriteFuture = channel.write(channelBuffer); 

 } 

 if (lastWriteFuture != null) { 

 lastWriteFuture.awaitUninterruptibly(); 

 } 

 // } 

 // Thread.sleep(50000); 

 // Wait until all messages are flushed before closing the channel. 


 Thread.sleep(50000); 

 // Close the connection. Make sure the close operation ends because 

 // all I/O operations are asynchronous in Netty. 

 channel.close().awaitUninterruptibly(); 


 // We should shut down all thread pools here to exit normally. 

 // However, it is just fine to call System.exit(0) because we are 

 // finished with the business. 

 System.exit(0); 

 } 

}

举报

相关推荐

0 条评论