0
点赞
收藏
分享

微信扫一扫

Netty(七)之聊天室小小小案例


需求

1)上线或者下线给其它人员通知

2)A发送消息其它人员都可见

设计思路

客户端与服务端建立连接后会触发 serverHandler中的 channelActive  方法,把channel保存到ChannelGroup中,当客户端给服务端发送消息时,把channelGroup中的每一个channel都把消息发送一遍,就实现群发功能

代码实现(亲测可用)

pom

<dependencies>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.25.Final</version>
</dependency>

</dependencies>

MyChatServerHandler

package mychat;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
* @author CBeann
* @create 2019-10-16 15:55
*/
public class MyChatServerHandler extends SimpleChannelInboundHandler<String> {


//用一个ChannelGroup保存所有连接到服务器的客户端通道
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);


@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {

Channel channel = channelHandlerContext.channel();

//服务器收到消息
// "[服务端] " + channel.remoteAddress() + "通道关闭";
String body = s;

//群发
channelGroup.forEach((x) -> {
if (x != channel) {
x.writeAndFlush(channel.remoteAddress() + "说===>" + s);
} else {
x.writeAndFlush("自己说===>" + s);
}
});


}


@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
String notice = "[服务端] " + channel.remoteAddress() + "通道激活";
System.out.println(notice);
channelGroup.writeAndFlush(notice);
//添加建立连接的channel
channelGroup.add(channel);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//删除失效的channel
channelGroup.remove(channel);
String notice = "[服务端] " + channel.remoteAddress() + "通道关闭";
channelGroup.writeAndFlush(notice);
}


@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel channel = ctx.channel();
System.out.println("[服务端] " + channel.remoteAddress() + "出现异常");
ctx.close();
}
}

MyChatServer

package mychat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

/**
* @author CBeann
* @create 2019-10-16 15:51
*/
public class MyChatServer {

public static void main(String[] args) {

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//TimeClientHandler是自己定义的方法
socketChannel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
socketChannel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
socketChannel.pipeline().addLast(new MyChatServerHandler());
}
});
//绑定端口
ChannelFuture f = b.bind(8888).sync();
//等待服务端监听端口关闭
f.channel().closeFuture().sync();


} catch (
Exception e) {

} finally {
//优雅关闭,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}


}
}

MyChatClientHandler

package mychat;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
* @author CBeann
* @create 2019-10-16 21:23
*/
public class MyChatClientHandler extends SimpleChannelInboundHandler<String> {


@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {


//收到服务端发送的消息
String body = s;
System.out.println(body);


}


@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel channel = ctx.channel();
System.out.println("[客户端出现异常");
ctx.close();
}
}

MyChatClient

package mychat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

import java.util.Scanner;

/**
* @author CBeann
* @create 2019-10-16 21:23
*/
public class MyChatClient {


public static void main(String[] args) throws Exception {
int port = 8888;
String host = "127.0.0.1";
//配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {

socketChannel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
socketChannel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
//TimeClientHandler是自己定义的方法
socketChannel.pipeline().addLast(new MyChatClientHandler());
}
});
//发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();


// //发送数据
Scanner reader = new Scanner(System.in);
String body = reader.nextLine();
while (!"exit".equals(body)) {
f.channel().writeAndFlush(body);
body = reader.nextLine();
}


//等待客户端链路关闭
f.channel().closeFuture().sync();

} catch (Exception e) {

} finally {
//优雅关闭
group.shutdownGracefully();
}
}
}

常见问题

1)IDEA怎么把一个启动类同时运行多次

先运行一下程序,在按照下面的操作进行

​​Idea中一个项目同时运行多个实例_EricXiao666的博客-idea运行多个项目​​


举报

相关推荐

0 条评论