0
点赞
收藏
分享

微信扫一扫

java netty and html 实现点对点 即时通讯 点对群及时通讯 实例代码 和jvm设置


目录

​​点对点​​

​​点对点反向​​

​​群发​​

​​maven 引入依赖​​

​​java netty 主启动类 代码​​

​​java netty 初始化类 代码​​

​​java netty 通道类 代码​​

​​java netty 通讯上下文管理类​​

​​html 网页代码​​

​​实例代码和jar包​​

​​附上一个netty 启动jar  jvm ​​

​​jvm常见配置汇总介绍​​

点对点

java netty and html 实现点对点 即时通讯 点对群及时通讯 实例代码 和jvm设置_服务端

 

点对点反向

java netty and html 实现点对点 即时通讯 点对群及时通讯 实例代码 和jvm设置_服务端_02

 

群发

java netty and html 实现点对点 即时通讯 点对群及时通讯 实例代码 和jvm设置_客户端_03

 

maven 引入依赖

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.32.Final</version>
</dependency>

 

 

java netty 主启动类 代码

package com.superman.testnetty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
* 程序的入口,负责启动应用
*Timely communication
*/
public class TCNettyServer {

private int port;

public TCNettyServer(int port) {
this.port = port;
}

public void run() {
// 启用两个Reactor线程池【netty是基于NIO的,基于线程处理的】
// 用于接收Client端连接的
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 进行网络通信读写
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
// 创建一个辅助类Bootstrap,就是对我们的Server进行一系列的配置
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workGroup);
// 指定使用NioServerSocketChannel这种类型的通道
b.channel(NioServerSocketChannel.class);
/*
* 使用 childHandler 去初始化服务器 添加handler,用来监听已经连接的客户端的Channel的动作和状态。
*
* 被绑定的MyWebSocketChannelHandler()里面设置了服务端初始化参数以及
*/
b.childHandler(new TCWebSocketChannelHandler());

System.out.println("netty服务端开启等待客户端连接....");
Channel ch = b.bind(port).sync().channel();
ch.closeFuture().sync();

} catch (Exception e) {
e.printStackTrace();
} finally {
// 优雅的退出程序
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}

public static void main(String[] args) {
new TCNettyServer(8888).run();
}

}

  • springboot 类启动的时候调用 new TCNettyServer(8888).run();

 

java netty 初始化类 代码

package com.superman.testnetty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

/**
* 初始化连接时候的各个组件
* Timely communication
*
*/
public class TCWebSocketChannelHandler extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel e) throws Exception {
e.pipeline().addLast("http-codec", new HttpServerCodec());
e.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
e.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
// 添加具体的处理器。可以addLast(或者addFirst)多个handler,
// 第一个参数是名字,无具体要求,如果填写null,系统会自动命名。
e.pipeline().addLast("handler", new TCWebSocketHandler());
/**通过使用管道的ChannelPipeline方式来处理请求
* 第一个配置的管道先处理,然后移交给下一个管道来处理,在每个管道处理中
* 各个handler可以决定是否继续或中断
* ChannelPipeline和ChannelHandler机制类似于Servlet和Filter过滤器{@link ChannelPipeline}
* Netty中的事件分为inbound事件和outbound事件。
* inbound事件通常由I/O线程触发,例如TCP链路建立事件、链路关闭事件、读事件、异常通知事件等。方法名以file开始{@link ChannelHandlerContext}
* outbound事件类似于发送、刷新、断开连接、绑定本地地址等关闭channel
*/
}



}

 

java netty 通道类 代码

package com.superman.testnetty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.handler.codec.http.multipart.MemoryAttribute;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* 接收/处理/响应客户端websocket请求的核心业务处理类
* 通过添加hanlder,我们可以监听Channel的各种动作以及状态的改变,包括连接,绑定,接收消息等。
*
* Timely communication
*/
public class TCWebSocketHandler extends SimpleChannelInboundHandler<Object> {

// 用于服务器端web套接字打开和关闭握手
private WebSocketServerHandshaker handshaker;

private static final String WEB_SOCKET_URL = "/websocket";


//客户端与服务端创建连接的时候调用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
TCChannelManage.group.add(ctx.channel());
System.out.println("客户端与服务端连接开启,客户端remoteAddress:" + ctx.channel().remoteAddress());
}

//客户端与服务端断开连接的时候调用
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
TCChannelManage.group.remove(ctx.channel());
System.out.println("客户端与服务端连接关闭...");
}

//服务端接收客户端发送过来的数据结束之后调用
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

//工程出现异常的时候调用
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

//服务端处理客户端websocket请求的核心方法
protected void messageReceived(ChannelHandlerContext context, Object msg) throws Exception {

/* 传统的HTTP接入(采用http处理方式)
* 第一次握手请求消息由HTTP协议承载,所以它是一个HTTP消息,
* 握手成功后,数据就直接从 TCP 通道传输,与 HTTP 无关了。
* 执行handleHttpRequest方法来处理WebSocket握手请求。
*/

// FullHttpRequest是完整的 HTTP请求,协议头和Form数据是在一起的,不用分开读
if (msg instanceof FullHttpRequest) {
handHttpRequest(context, (FullHttpRequest) msg);
}
/**
* WebSocket接入(采用socket处理方式)
* 提交请求消息给服务端,
* WebSocketServerHandler接收到的是已经解码后的WebSocketFrame消息。
*/
else if (msg instanceof WebSocketFrame) {
handWebsocketFrame(context, (WebSocketFrame) msg);
}
/**
* Websocket的数据传输是frame形式传输的,比如会将一条消息分为几个frame,按照先后顺序传输出去。这样做会有几个好处:
*
* 1)大数据的传输可以分片传输,不用考虑到数据大小导致的长度标志位不足够的情况。
*
* 2)和http的chunk一样,可以边生成数据边传递消息,即提高传输效率。
*/
}

/**
* 处理客户端与服务端之前的websocket业务
*
* @param ctx
* @param frame
*/
private void handWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {

//判断是否是关闭websocket的指令
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
}

//判断是否是ping消息
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}

if (frame instanceof TextWebSocketFrame) {

// 返回应答消息
String requestMsg = ((TextWebSocketFrame) frame).text();
System.out.println("收到客户端" + ctx.channel().remoteAddress() + "的消息==》" + requestMsg);
String[] array = requestMsg.split(",");
// 先判断通道管理器中是否存在该通道,没有则添加进去
if (!TCChannelManage.hasChannel(array[0])) {
TCChannelManage.userIdAndChannelMap.put(array[0], ctx.channel());
}

if (array[0].length() != 0 && array[1].length() != 0) {
TCChannelManage.send(array[0], array[1], array[2], ctx.channel());
} else if (array[0].length() != 0 && array[1].length() == 0) {
//如果没有指定接收者表示群发array.length() = 2
System.out.println("用户" + array[0] + "群发了一条消息:" + array[2]);
TCChannelManage.group.writeAndFlush(new TextWebSocketFrame("用户" + array[0] + "群发了一条消息:" + array[2]));
} else {
//如果没有指定发送者与接收者表示向服务端发送array.length() = 1
System.out.println("服务端接收用户" + ctx.channel().remoteAddress() + "消息,不再发送出去");
ctx.writeAndFlush(new TextWebSocketFrame("你向服务端发送了消息==》" + array[2]));
}
}
}


/**
* 处理客户端向服务端发起http握手请求的业务
*
* @param ctx
* @param req
*/
private void handHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
System.out.println("处理http请求,http方法==>>" + req.getMethod() + ",http地址==>>" + req.getUri());
Map<String, String> parmMap = new HashMap<>();
try {
parmMap = parse(req);
} catch (IOException e) {
e.printStackTrace();
}
// 如果不是WebSocket握手请求消息,那么就返回 HTTP 400 BAD REQUEST 响应给客户端。
if (!req.getDecoderResult().isSuccess()
|| !("websocket".equals(req.headers().get("Upgrade")))) {
sendHttpResponse(ctx, req,
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
//如果是握手请求,那么就进行握手
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
WEB_SOCKET_URL, null, false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
} else {
// 通过它构造握手响应消息返回给客户端,
// 同时将WebSocket相关的编码和解码类动态添加到ChannelPipeline中,用于WebSocket消息的编解码,
// 添加WebSocketEncoder和WebSocketDecoder之后,服务端就可以自动对WebSocket消息进行编解码了
handshaker.handshake(ctx.channel(), req);
}
}

/**
* 服务端向客户端响应消息
*
* @param ctx
* @param req
* @param res
*/
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req,
DefaultFullHttpResponse res) {
// 返回应答给客户端
if (res.getStatus().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
}
// 如果是非Keep-Alive,关闭连接
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (res.getStatus().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}

/**
* 解析GET、POST请求参数
* @return 包含所有请求参数的键值对, 如果没有参数, 则返回空Map
*
* @throws IOException
*/
public Map<String, String> parse(FullHttpRequest fullReq) throws IOException {

HttpMethod method = fullReq.getMethod();

Map<String, String> parmMap = new HashMap<>();

if (HttpMethod.GET == method) {
// 是GET请求
QueryStringDecoder decoder = new QueryStringDecoder(fullReq.getUri());
decoder.parameters().entrySet().forEach( entry -> {
// entry.getValue()是一个List, 只取第一个元素
parmMap.put(entry.getKey(), entry.getValue().get(0));
});
} else if (HttpMethod.POST == method) {
HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(
new DefaultHttpDataFactory(false), fullReq);
List<InterfaceHttpData> postData = decoder.getBodyHttpDatas();
for(InterfaceHttpData data:postData){
if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
MemoryAttribute attribute = (MemoryAttribute) data;
parmMap.put(attribute.getName(), attribute.getValue());
}
}
} else {
// 不支持其它方法
System.out.println("不支持其他方法提交的参数");
}

return parmMap;
}

@Override
protected void channelRead0(ChannelHandlerContext arg0, Object arg1)
throws Exception {
messageReceived(arg0,arg1);
}
}

  • 这个类主要处理通讯时候寻找对应的通道

 

java netty 通讯上下文管理类

package com.superman.testnetty;

import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* 存储整个工程的全局配置
* Timely communication
*
*/
public class TCChannelManage {

/**
* 存储每一个客户端接入进来时的channel对象
*/
public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

// 读锁
private static ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);

public static ConcurrentMap<String, Channel> userIdAndChannelMap = new ConcurrentHashMap<>();

public static void send(String senderId, String receiverId, String message, Channel senderChannel) {
// 发送肯定是A要给B发,A就是发消息的对象,B可以是人,机器等对象
try {
rwLock.readLock().lock();
// 1.寻找receiverId的channel
Channel receiverChannel = userIdAndChannelMap.get(receiverId);
if (receiverChannel == null) {
// 使用发送者的通道告知发送者,你要发的那个人不在线
senderChannel.writeAndFlush(new TextWebSocketFrame(receiverId + "不在线"));
return;
}
// 2.发送。A给B发,B若要收到消息,其实是通过B的channel给B发消息
receiverChannel.writeAndFlush(new TextWebSocketFrame(senderId + "发来消息===》" + message));
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.readLock().unlock();
}
}

public static boolean hasChannel(String id) {
Channel channel = userIdAndChannelMap.get(id);
if (channel == null) {
return false;
} else {
return true;
}
}

}

  • 通道存储
  • 和添加删除啥的用在这即可

 

 

html 网页代码

<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset = utf-8"/>
<title>WebSocket客户端</title>
<script type="text/javascript">
var socket;
if(!window.WebSocket){
window.WebSocket = window.MozWebSocket;
}

if(window.WebSocket){
socket = new WebSocket("ws://localhost:8888/websocket");
socket.onmessage = function(event){
var ta = document.getElementById('responseContent');
ta.value += event.data + "\r\n";
};

socket.onopen = function(event){
var ta = document.getElementById('responseContent');
ta.value = "你当前的浏览器支持WebSocket,请进行后续操作\r\n";
};

socket.onclose = function(event){
var ta = document.getElementById('responseContent');
ta.value = "";
ta.value = "WebSocket连接已经关闭\r\n";
};
}else{
alert("您的浏览器不支持WebSocket");
}


function send(message){
if(!window.WebSocket){
return;
}
if(socket.readyState == WebSocket.OPEN){
socket.send(message);
}else{
alert("WebSocket连接没有建立成功!!");
}
}
</script>
</head>
<body>
<H3>及时唠嗑</H3>
<form onSubmit="return false;">
<input type = "text" name = "senderId" placeholder="发送者唯一id"/>
<br/><br/>
<input type = "text" name = "receiverId" placeholder="接收者唯一id"/>
<br/><br/>
<input type = "text" name = "message" placeholder="消息内容"/>
<br/><br/>
<input type = "button" value = "发送WebSocket请求消息" onClick = "send(this.form.senderId.value+','+this.form.receiverId.value+','+this.form.message.value)"/>
<hr color="red"/>
<h2>客户端接收到服务端返回的应答消息</h2>
<textarea id = "responseContent" style = "width:1024px; height:300px"></textarea>
</form>
</body>
</html>

 

实例代码和jar包

​​javascript:void(0)​​

 

 

附上一个netty 启动jar  jvm 

java -jar -server -Xms4G -Xmx4G -XX:NewSize=3584m -XX:PermSize=64m -XX:SurvivorRatio=1 -XX:+UseParallelGC -XX:-UseAdaptiveSizePolicy  

  • java -jar -server 
  • -Xms4G:初始堆大小,设置JVM促使内存为4G。此值可以设置与-Xmx相同,以避免每次垃圾回收完成后JVM重新分配内存。
  •  -Xmx4G  :最大堆大小设置JVM最大可用内存为4G。
  •  -XX:NewSize=3584m :设置年轻代大小
  • -XX:PermSize=64m :设置持久代大小为
  • -XX:SurvivorRatio=1  :年轻代中Eden区与两个Survivor区的比值。注意Survivor区有两个。如:3,表示Eden:Survivor=3:2,一个Survivor区占整个年轻代的1/5
  • -XX:+UseParallelGC  :设置并行收集器
  • -XX:+UseAdaptiveSizePolicy :设置此选项后,并行收集器会自动选择年轻代区大小和相应的Survivor区比例,以达到目标系统规定的最低相应时间或者收集频率等,此值建议使用并行收集器时,一直打开。

 

jvm常见配置汇总介绍

  1. 堆设置
  • -Xms:初始堆大小
  • -Xmx:最大堆大小
  • -XX:NewSize=n:设置年轻代大小
  • -XX:NewRatio=n:设置年轻代和年老代的比值。如:为3,表示年轻代与年老代比值为1:3,年轻代占整个年轻代年老代和的1/4
  • -XX:SurvivorRatio=n:年轻代中Eden区与两个Survivor区的比值。注意Survivor区有两个。如:3,表示Eden:Survivor=3:2,一个Survivor区占整个年轻代的1/5
  • -XX:MaxPermSize=n:设置持久代大小
  1. 收集器设置
  • -XX:+UseSerialGC:设置串行收集器
  • -XX:+UseParallelGC:设置并行收集器
  • -XX:+UseParalledlOldGC:设置并行年老代收集器
  • -XX:+UseConcMarkSweepGC:设置并发收集器
  1. 垃圾回收统计信息
  • -XX:+PrintGC
  • -XX:+PrintGCDetails
  • -XX:+PrintGCTimeStamps
  • -Xloggc:filename
  1. 并行收集器设置
  • -XX:ParallelGCThreads=n:设置并行收集器收集时使用的CPU数。并行收集线程数。
  • -XX:MaxGCPauseMillis=n:设置并行收集最大暂停时间
  • -XX:GCTimeRatio=n:设置垃圾回收时间占程序运行时间的百分比。公式为1/(1+n)
  1. 并发收集器设置
  • -XX:+CMSIncrementalMode:设置为增量模式。适用于单CPU情况。
  • -XX:ParallelGCThreads=n:设置并发收集器年轻代收集方式为并行收集时,使用的CPU数。并行收集线程数。

 

轻松处理器支持1w往上

 

 

ok

 

 

持续更新

 

 

 

 

 

举报

相关推荐

0 条评论