目录
介绍
spring boot 结合 netty 启动 查看
springboot 依赖查看
这里介绍 及时通讯内容
IMNettyServer
IMNettyServerInitialzer
IMNettyHandler
IMNettyChanneManage
页面代码
启动日志
页面效果
介绍
采用 用户 1 发给用户 2 通过 通道方式发送消息
1.每个用户连接后进行记录通道
2.新用户发送消息,服务寻找通道,找到通道发送,没找到通道返回已发送,未读
这里介绍 及时通讯内容
IMNettyServer
package com.superman.nettyim;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
/**
* 及时通讯服务端
*
* @author jianghy
*
* @version 1.0
*/
public class IMNettyServer {
private int port = 8088;
public IMNettyServer(int port) {
this.port = port;
}
public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
System.out.println("----------------- netty Server Start--------------------");
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
//在这里,我们指定使用NioServerSocketChannel用于实例化新的类Channel以接受传入的连接。
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new IMNettyServerInitialzer())
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
ChannelFuture f = b.bind(port).sync(); // (7)
System.out.println("-------------------netty server port:"+port+" start success -----------");
// 等待服务器套接字关闭。
//在本例中,这种情况不会发生,但您可以这样做
//优雅地
//关闭服务器。
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new IMNettyServer(8088).run();
}
}
IMNettyServerInitialzer
.
package com.superman.nettyim;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
/**
* 初始化类
*
* @author jianghy
*
* @version 1.0
*/
public class IMNettyServerInitialzer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// websocket 基于http协议,所以要有http编解码器
pipeline.addLast(new HttpServerCodec());
// 对写大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
// 对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse
// 几乎在netty中的编程,都会使用到此hanler
pipeline.addLast(new HttpObjectAggregator(1024*64));
// ====================== 以上是用于支持http协议 ======================
// ====================== 以下是支持httpWebsocket ======================
/**
* websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws
* 本handler会帮你处理一些繁重的复杂的事
* 会帮你处理握手动作: handshaking(close, ping, pong) ping + pong = 心跳
* 对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// 自定义的handler
pipeline.addLast(new IMNettyHandler());
}
}
IMNettyHandler
package com.superman.nettyim;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import com.superman.netty.im.TCChannelManage;
/**
* 通道处理消息类
*
* @author jianghy
*
* @version 1.0
*/
public class IMNettyHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
//1.获取客户端传来信息
String requestMsg = msg.text();
System.out.println("收到客户端" + ctx.channel().remoteAddress() + "的消息==》" + requestMsg);
/**
* 2.解析数据 单人对单人的发送服务,管理员可以进行群发 (要校验权限)
* array[0] sendUserId 发送人id
* array[1] sendUserToken 发送人user_online_token
* array[2] receivedUserType 接受用户类型 (0全部、1单人、2某群)
* array[3] receivedUserId 接收人id(null、单人id、群发id)
* array[4] message 消息内容
*
*/
String[] array = requestMsg.split(",");
messageManagementService(ctx, msg, array);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
IMNettyChanneManage.clients.add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 当触发handlerRemoved,ChannelGroup会自动移除对应客户端的channel
IMNettyChanneManage.clients.remove(ctx.channel());
System.out.println("客户端断开,channle对应的长id为:" + ctx.channel().id().asLongText());
System.out.println("客户端断开,channle对应的短id为:" + ctx.channel().id().asShortText());
}
/**
* 消息管理处理服务
*
* @param ctx
* @param msg
* @param array
* array[0] sendUserId 发送人id
* array[1] sendUserToken 发送人user_online_token
* array[2] receivedUserType 接受用户类型 (0全部、1单人、2某群)
* array[3] receivedUserId 接收人id(null、单人id、群发id)
* array[4] message 消息内容
*
*/
public void messageManagementService(ChannelHandlerContext ctx, TextWebSocketFrame msg,String[] array){
//3.校验数据格式正确
if(array.length!=5){
ctx.writeAndFlush(new TextWebSocketFrame("{state:非法请求}"));
return;
}
//4.TODO 校验在线令牌
if (array[0].length() != 0 && array[1].length() != 0) {//校验令牌
//TODO -----------读取redis进行校验,异常return掉-----------
//ctx.writeAndFlush(new TextWebSocketFrame("{state:'illegal token'}"));
//return;
}else{//在线用户数据异常
ctx.writeAndFlush(new TextWebSocketFrame("{state:'illegal token'}"));
return;
}
//5.判断now user channel 是否在 本地缓存组中
if (!TCChannelManage.hasChannel(array[0])) {//没有添加通道
IMNettyChanneManage.userIdAndChannelMap.put(array[0], ctx.channel());
}
//6.发送类型是否正确
if(array[2].length() == 0 ){//非法请求丢弃消息
ctx.writeAndFlush(new TextWebSocketFrame("{state:'非法请求'}"));
return;
}
//7.判断发送消息类型
if(array[2].toString().equals("1")){//8.单人
IMNettyChanneManage.sendOnlyUserInfo(array[0], array[3], array[4], ctx.channel());
}else if(array[2].toString().equals("2")){//9.群发
//TODO 1.数据库通过群号,找到用户和当前群的所有用户数据
//2.把能发送的发送,不能发送的存放到数据库中,每次用户来了先拉去未读消息
//3.循环吧每个通道中有的在线用户信息发出去
}else if(array[2].toString().equals("0")){//10.所有用户
//TODO 校验权限 管理员权限发送所有 用户
System.out.println("用户" + array[0] + "群发了一条消息:" + array[4]);
IMNettyChanneManage.clients.writeAndFlush(new TextWebSocketFrame("用户" + array[0] + "群发了一条消息:" + array[4]));
}else{//11.请求参数不正确,反馈信息,非法请求
System.out.println("user:" + ctx.channel().remoteAddress() + "消息发送类型异常");
ctx.writeAndFlush(new TextWebSocketFrame("{state:'非法请求'}"));
}
}
}
IMNettyChanneManage
package com.superman.nettyim;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 消息发送集合存储类
*
* @author jianghy
*
* @version 1.0
*/
public class IMNettyChanneManage {
/*
* 通道组,用来广播使用
*/
public static ChannelGroup clients = new DefaultChannelGroup( GlobalEventExecutor.INSTANCE);
//读锁
private static ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
//通道集合
public static ConcurrentMap<String, Channel> userIdAndChannelMap = new ConcurrentHashMap<>();
/**
* 发给单个人送消息服务
*
* @param senderId 发送人id
* @param receiverId 收信人id
* @param message 信息
* @param senderChannel 发件人内容(如果通道中没找到收件人,提示原发送人,消息已发送未读)
*
*/
public static void sendOnlyUserInfo(String senderId, String receiverId, String message, Channel senderChannel) {
try {
rwLock.readLock().lock();//开启锁
//1.获取收信人通道
Channel receiverChannel = userIdAndChannelMap.get(receiverId);
if (receiverChannel == null) {//收件人不在线
//TODO 将数据记录到数据库,等用户上线拉去信息
senderChannel.writeAndFlush(new TextWebSocketFrame(receiverId + "已发送,未读"));
return;
}
// 2.通过收信人通道发送消息 (发送人id,发送类型,消息)
receiverChannel.writeAndFlush(new TextWebSocketFrame(senderId + ",1," + message));
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.readLock().unlock();//关闭锁
}
}
/**
* 判断通道组中是否用当前用户
*
* @param id 用户id
*
* @return boolean
*/
public static boolean hasChannel(String id) {
Channel channel = userIdAndChannelMap.get(id);
if (channel == null) {
return false;
} else {
return true;
}
}
}
页面代码
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title></title>
</head>
<body>
<h3>在线客服</h3>
<p>
发送人id: <input type="text" id="msg0" /><br />
发送人token:<input type="text" id="msg1" /><br />
收信人类型:<input type="text" id="msg2" value="1" /><br />
收信人id:<input type="text" id="msg3" /><br />
信息:<input type="text" id="msg4" /><br />
<input type="button" value="点我发送" onclick="CHAT.chat()" />
</p>
<div>消息:</div>
<div id="receiveMsg" style="background-color: gainsboro;"></div>
<script type="application/javascript">
window.CHAT = {
socket: null,
init: function () {
if (window.WebSocket) {
CHAT.socket = new WebSocket("ws://127.0.0.1:9019/ws");
CHAT.socket.onopen = function () {
console.log("连接建立成功...");
},
CHAT.socket.onclose = function () {
console.log("连接关闭...");
},
CHAT.socket.onerror = function () {
console.log("发生错误...");
},
CHAT.socket.onmessage = function (e) {
console.log("接受到消息:" + e.data);
var receiveMsg = document.getElementById("receiveMsg");
var html = receiveMsg.innerHTML;
receiveMsg.innerHTML = html + "<br/>" + "接受到消息:" + e.data;
}
} else {
alert("浏览器不支持websocket协议...");
}
},
chat: function () {
var msg0 = document.getElementById("msg0").value;
var msg1 = document.getElementById("msg1").value;
var msg2 = document.getElementById("msg2").value;
var msg3 = document.getElementById("msg3").value;
var msg4 = document.getElementById("msg4").value;
CHAT.socket.send(msg0 + "," + msg1 + "," + msg2 + "," + msg3 + "," + msg4);
var receiveMsg = document.getElementById("receiveMsg");
var html = receiveMsg.innerHTML;
receiveMsg.innerHTML = html + "<br/>" + "发送消息:" + msg4;
}
};
CHAT.init();
</script>
</body>
</html>
启动日志
The Class-Path manifest attribute in D:\apache-maven-3.5.3\apache-maven-3.5.3\maven_repository\com\drewnoakes\metadata-extractor\2.14.0\metadata-extractor-2.14.0.jar referenced one or more files that do not exist: file:/D:/apache-maven-3.5.3/apache-maven-3.5.3/maven_repository/com/drewnoakes/metadata-extractor/2.14.0/xmpcore-6.0.6.jar
The Class-Path manifest attribute in D:\apache-maven-3.5.3\apache-maven-3.5.3\maven_repository\org\htmlparser\htmlparser\2.1\htmlparser-2.1.jar referenced one or more files that do not exist: file:/D:/apache-maven-3.5.3/apache-maven-3.5.3/maven_repository/org/htmlparser/htmlparser/2.1/sax-2.0.1.jar,file:/D:/apache-maven-3.5.3/apache-maven-3.5.3/maven_repository/org/htmlparser/htmlparser/2.1/htmllexer-2.1.jar
The Class-Path manifest attribute in D:\apache-maven-3.5.3\apache-maven-3.5.3\maven_repository\com\sun\xml\bind\jaxb-impl\2.1\jaxb-impl-2.1.jar referenced one or more files that do not exist: file:/D:/apache-maven-3.5.3/apache-maven-3.5.3/maven_repository/com/sun/xml/bind/jaxb-impl/2.1/jaxb-api.jar,file:/D:/apache-maven-3.5.3/apache-maven-3.5.3/maven_repository/com/sun/xml/bind/jaxb-impl/2.1/activation.jar,file:/D:/apache-maven-3.5.3/apache-maven-3.5.3/maven_repository/com/sun/xml/bind/jaxb-impl/2.1/jsr173_1.0_api.jar,file:/D:/apache-maven-3.5.3/apache-maven-3.5.3/maven_repository/com/sun/xml/bind/jaxb-impl/2.1/jaxb1-impl.jar
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.1.7.RELEASE)
2020-11-23 15:04:23.975 INFO com.superman.Provider_App - Starting Provider_App on UJ8XUW5KCNHO65F with PID 7212 (D:\eclipseworks_2020\superporject\super_netty-im\target\classes started by Administrator in D:\eclipseworks_2020\superporject\super_netty-im)
2020-11-23 15:04:23.979 INFO com.superman.Provider_App - No active profile set, falling back to default profiles: default
2020-11-23 15:04:24.060 INFO org.springframework.boot.devtools.env.DevToolsPropertyDefaultsPostProcessor - Devtools property defaults active! Set 'spring.devtools.add-properties' to 'false' to disable
2020-11-23 15:04:24.061 INFO org.springframework.boot.devtools.env.DevToolsPropertyDefaultsPostProcessor - For additional web related logging consider setting the 'logging.level.web' property to 'DEBUG'
2020-11-23 15:04:27.286 INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration' of type [org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration$$EnhancerBySpringCGLIB$$a421cf07] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-11-23 15:04:29.690 INFO org.springframework.boot.web.embedded.tomcat.TomcatWebServer - Tomcat initialized with port(s): 9018 (https)
2020-11-23 15:04:29.723 INFO org.apache.coyote.http11.Http11NioProtocol - Initializing ProtocolHandler ["https-jsse-nio-9018"]
2020-11-23 15:04:29.740 INFO org.apache.catalina.core.StandardService - Starting service [Tomcat]
2020-11-23 15:04:29.740 INFO org.apache.catalina.core.StandardEngine - Starting Servlet engine: [Apache Tomcat/9.0.22]
2020-11-23 15:04:30.136 INFO org.apache.catalina.core.ContainerBase.[Tomcat].[localhost].[/] - Initializing Spring embedded WebApplicationContext
2020-11-23 15:04:30.136 INFO org.springframework.web.context.ContextLoader - Root WebApplicationContext: initialization completed in 6064 ms
2020-11-23 15:04:32.193 INFO org.springframework.boot.devtools.autoconfigure.OptionalLiveReloadServer - LiveReload server is running on port 35729
2020-11-23 15:04:32.305 INFO springfox.documentation.spring.web.PropertySourcedRequestMappingHandlerMapping - Mapped URL path [/v2/api-docs] onto method [public org.springframework.http.ResponseEntity<springfox.documentation.spring.web.json.Json> springfox.documentation.swagger2.web.Swagger2Controller.getDocumentation(java.lang.String,javax.servlet.http.HttpServletRequest)]
2020-11-23 15:04:32.337 INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService
2020-11-23 15:04:32.339 INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService 'getExecutor'
2020-11-23 15:04:32.346 INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService
2020-11-23 15:04:32.347 INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService 'getAgentExecutor'
2020-11-23 15:04:32.813 INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
2020-11-23 15:04:33.106 INFO springfox.documentation.spring.web.plugins.DocumentationPluginsBootstrapper - Context refreshed
2020-11-23 15:04:33.155 INFO springfox.documentation.spring.web.plugins.DocumentationPluginsBootstrapper - Found 1 custom documentation plugin(s)
2020-11-23 15:04:33.260 INFO springfox.documentation.spring.web.scanners.ApiListingReferenceScanner - Scanning for api listing references
2020-11-23 15:04:33.321 INFO org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["https-jsse-nio-9018"]
2020-11-23 15:04:35.178 INFO org.springframework.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 9018 (https) with context path ''
2020-11-23 15:04:35.182 INFO com.superman.Provider_App - Started Provider_App in 12.22 seconds (JVM running for 13.019)
-------------------netty server port:9019 start success -----------
页面效果
ok
持续更新