0
点赞
收藏
分享

微信扫一扫

从IO模型到Netty笔记(五)

编程练习生J 2022-03-12 阅读 56
javanetty

心跳机制

重点:pipeline.addLast(new IdleStateHandler(3,5,12, TimeUnit.SECONDS));

public class NettyServer {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap sb = new ServerBootstrap();
            sb.group(bossGroup,workerGroup)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,128)
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new IdleStateHandler(3,5,12, TimeUnit.SECONDS));
                            pipeline.addLast(new NettyHandler());
                        }
                    });
            ChannelFuture channelFuture = sb.bind(6668).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }

    }
}

handler重点:重写userEventTriggered方法

public class NettyHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent){
            switch (((IdleStateEvent)evt).state()){
                case ALL_IDLE:
                    System.out.println("读写超时");
                    break;
                case READER_IDLE:
                    System.out.println("读超时");
                    break;
                case WRITER_IDLE:
                    System.out.println("写超时");
                    break;
            }
        }
    }
}

实现webSocket

public class WsSever {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap sb = new ServerBootstrap();
            sb.group(bossGroup,workerGroup)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            // 基于http协议
                            pipeline.addLast(new HttpServerCodec());
                            // 以块方式写
                            pipeline.addLast(new ChunkedWriteHandler());
                            // http传输是分段的,所以最后需要将其聚合
                            pipeline.addLast(new HttpObjectAggregator(8192));
                            // 以帧传送 请求地址为"ws://localhost:6668/hello"
                            pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
                            // 处理器
                            pipeline.addLast(new WsHandler());
                        }
                    });
            ChannelFuture channelFuture = sb.bind(6668).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

handler

// TextWebSocketFrame 表示文本帧
public class WsHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
        System.out.println("服务器收到消息:"+ textWebSocketFrame.text());
        channelHandlerContext.pipeline().writeAndFlush(new TextWebSocketFrame(textWebSocketFrame.text()));
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("-------"+ctx.channel().id().asLongText()+"-------");
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("=========="+ctx.channel().id().asLongText()+"==========");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("异常发生"+cause.getMessage());
    }
}

html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<script>
    var socket;
    // 判断当前浏览器是否支持webSocket编程
    if(window.WebSocket){
        socket = new WebSocket("ws://127.0.0.1:6668/hello");
        // 相当于channelRead0,ev 收到服务器端回送的消息
        socket.onmessage = function (ev) {
            var rt = document.getElementById("responseText");
            rt.value = rt.value + '\n' + ev.data;
        };
        socket.onopen = function (ev) {
            // 相当于连接开启
            var rt = document.getElementById("responseText");
            rt.value = "连接开启了..."
        }
        socket.onclose = function (ev) {
            // 连接关闭
            var rt = document.getElementById("responseText");
            rt.value = rt.value +"连接关闭了..."
        }
    }else {
        alert("当年浏览器不支持webSocket")
    }

    function send(message) {
        if(!window.socket){
            return;
        }
        if(socket.readyState == WebSocket.OPEN) {
            socket.send(message);
        } else {
            alert("连接未开启");
        }
    }
</script>
<body>
    <form onsubmit="return false">
        <label>
            <textarea name="message" style="height: 300px; width: 300px" ></textarea>
        </label>
        <input type="button" value="发送消息" onclick="send(this.form.message.value)">
        <label>
            <textarea id="responseText" style="height: 300px; width: 300px" ></textarea>
        </label>
        <input type="button" value="清空内容"
               onclick="document.getElementById('responseText').value=''">
    </form>


</body>
</html>

编解码

netty提供的编解码有

  • StringDecoder
  • StringEncoder
  • ObjectDecoder
  • ObjectEncoder

但是效率低下,无法跨语言。

我们一般使用protobuf进行编解码,效率高,跨语言。

protobuf使用案例:

引入依赖

创建.proto文件

 下载proto的转换工具

https://github.com/protocolbuffers/protobuf/releases

开始转换

方法一:

 

 运行之后就多了一个文件

 方法二

用插件生成,把proto文件放在main的proto文件下

        <plugins>
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.5.0</version>
                <configuration>
                    <protocArtifact>
                        com.google.protobuf:protoc:3.1.0:exe:${os.detected.classifier}
                    </protocArtifact>
                    <pluginId>grpc-java</pluginId>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>

客户端编码发送

pipeline中加入对应的编码器

public class CodecClient {
    public static void main(String[] args) {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new ProtobufEncoder());
                            //加入自己的业务处理handler
                            pipeline.addLast(new ProClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
            channelFuture.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }
}

handler中发送

public class ProClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(2).setName("小L").build();
        ctx.channel().writeAndFlush(student);
    }
}

服务端解码接收

pipeline中加入解码处理类型

public class NettyCodec {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap sb = new ServerBootstrap();
            sb.group(bossGroup,workerGroup)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
                            pipeline.addLast(new ServerProto());
                        }
                    });
            ChannelFuture channelFuture = sb.bind(6668).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

handler接收显示

public class ServerProto extends SimpleChannelInboundHandler<StudentPOJO.Student> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, StudentPOJO.Student student) throws Exception {
        System.out.println(student.getId());
        System.out.println(student.getName());
    }
}
举报

相关推荐

从IO模型到Netty笔记(四)

Netty学习笔记(一) IO

Netty之IO模型&通信机制

五种IO模型

Linux 五种IO模型

五种网络IO模型

五种常见IO模型

0 条评论