0
点赞
收藏
分享

微信扫一扫

《Netty权威指南》(二) Http协议开发

文章目录

3.1 Http协议开发

3.1.1 基本知识

  • HTTP(超文本传输协议)协议是建立在TCP传输协议之上的应用层协议,它的发展是万维网协会和Internet工作小组IETF合作的结果。HTTP是一个属于应用层的面向对象的协议,由于其简捷、快速的方式,适用于分布式超媒体信息系统。它于1990年提出,经过多年的使用和发展,得到了不断地完善和扩展。
  • 由于 HTTP协议是目前Web开发的主流协议,基于 HTTP的应用非常广泛,因此,掌握HTTP的开发非常重要。
  • HTTP是一个属于应用层的面向对象的协议,由于其简捷、快速的方式,适用于分布式超媒体信息系统。
  • HTTP协议的主要特点如下。支持Client/Server模式。
  • 简单——客户向服务器请求服务时,只需指定服务URL,携带必要的请求参数或中的者消息体。
  • 灵活——HTTP允许传输任意类型的数据对象,传输的内容类型由 HTTP消息头中的Content-Type加以标记。
  • 无状态——HTTP协议是无状态协议,无状态是指协议对于事务处理没有记忆能力。缺少状态意味着如果后续处理需要之前的信息,则它必须重传,这样可能导致每次连接传送的数据量增大。另一方面,在服务器不需要先前信息时它的应答就较快,负载较轻。

3.1.2 HTTP请求消息(HttpRequest)

  • 客户端发送一个HTTP请求到服务器的请求消息包括以下格式:请求行(request line)、请求头部(header)、空行和请求数据四个部分组成,下图给出了请求报文的一般格式。

image.png

GET /hello.txt HTTP/1.1
User-Agent: curl/7.16.3 libcurl/7.16.3 OpenSSL/0.9.7l zlib/1.2.3
Host: www.example.com
Accept-Language: en, mi

请求方法
根据HTTP标准,HTTP请求可以使用多种请求方法。
HTTP1.0定义了三种请求方法: GET, POST 和 HEAD方法。
HTTP1.1新增了五种请求方法:OPTIONS, PUT, DELETE, TRACE 和 CONNECT 方法。

序号方法描述
1GET请求指定的页面信息,并返回实体主体。
2HEAD类似于get请求,只不过返回的响应中没有具体的内容,用于获取报头
3POST向指定资源提交数据进行处理请求(例如提交表单或者上传文件)。数据被包含在请求体中。POST请求可能会导致新的资源的建立和/或已有资源的修改。
4PUT从客户端向服务器传送的数据取代指定的文档的内容。
5DELETE请求服务器删除指定的页面。
6CONNECTHTTP/1.1协议中预留给能够将连接改为管道方式的代理服务器。
7OPTIONS允许客户端查看服务器的性能。
8TRACE回显服务器收到的请求,主要用于测试或诊断。

消息头
image.png

3.1.3 HTTP响应消息(HttpResponse)

  • HTTP响应也由四个部分组成,分别是:状态行、消息报头、空行和响应正文。

image.png
状态码
当浏览者访问一个网页时,浏览者的浏览器会向网页所在服务器发出请求。当浏览器接收并显示网页前,此网页所在的服务器会返回一个包含HTTP状态码的信息头(server header)用以响应浏览器的请求。
HTTP状态码的英文为HTTP Status Code。
下面是常见的HTTP状态码:

  • 200 - 请求成功
  • 301 - 资源(网页等)被永久转移到其它URL
  • 404 - 请求的资源(网页等)不存在
  • 500 - 内部服务器错误

image.png
响应头消息

Header解释示例
Accept-Ranges表明服务器是否支持指定范围请求及哪种类型的分段请求Accept-Ranges: bytes
Age从原始服务器到代理缓存形成的估算时间(以秒计,非负)Age: 12
Allow对某网络资源的有效的请求行为,不允许则返回405Allow: GET, HEAD
Cache-Control告诉所有的缓存机制是否可以缓存及哪种类型Cache-Control: no-cache
Content-Encodingweb服务器支持的返回内容压缩编码类型。Content-Encoding: gzip
Content-Language响应体的语言Content-Language: en,zh
Content-Length响应体的长度Content-Length: 348
Content-Location请求资源可替代的备用的另一地址Content-Location: /index.htm
Content-MD5返回资源的MD5校验值Content-MD5: Q2hlY2sgSW50ZWdyaXR5IQ==
Content-Range在整个返回体中本部分的字节位置Content-Range: bytes 21010-47021/47022
Content-Type返回内容的MIME类型Content-Type: text/html; charset=utf-8
Date原始服务器消息发出的时间Date: Tue, 15 Nov 2010 08:12:31 GMT
ETag请求变量的实体标签的当前值ETag: “737060cd8c284d8af7ad3082f209582d”
Expires响应过期的日期和时间Expires: Thu, 01 Dec 2010 16:00:00 GMT
Last-Modified请求资源的最后修改时间Last-Modified: Tue, 15 Nov 2010 12:45:26 GMT
Location用来重定向接收方到非请求URL的位置来完成请求或标识新的资源Location: http://www.zcmhi.com/archives/94.html
Pragma包括实现特定的指令,它可应用到响应链上的任何接收方Pragma: no-cache
Proxy-Authenticate它指出认证方案和可应用到代理的该URL上的参数Proxy-Authenticate: Basic
refresh应用于重定向或一个新的资源被创造,在5秒之后重定向(由网景提出,被大部分浏览器支持)

Refresh: 5; url=
http://www.zcmhi.com/archives/94.html |
| Retry-After | 如果实体暂时不可取,通知客户端在指定时间之后再次尝试 | Retry-After: 120 |
| Server | web服务器软件名称 | Server: Apache/1.3.27 (Unix) (Red-Hat/Linux) |
| Set-Cookie | 设置Http Cookie | Set-Cookie: UserID=JohnDoe; Max-Age=3600; Version=1 |
| Trailer | 指出头域在分块传输编码的尾部存在 | Trailer: Max-Forwards |
| Transfer-Encoding | 文件传输编码 | Transfer-Encoding:chunked |
| Vary | 告诉下游代理是使用缓存响应还是从原始服务器请求 | Vary: * |
| Via | 告知代理客户端响应是通过哪里发送的 | Via: 1.0 fred, 1.1 nowhere.com (Apache/1.1) |
| Warning | 警告实体可能存在的问题 | Warning: 199 Miscellaneous warning |
| WWW-Authenticate | 表明客户端请求实体应该使用的授权方案 | WWW-Authenticate: Basic |

3.1.4 Http开发

netty天生异步事件驱动的架构,无论是在性能上还是在可靠性上,都表现优异,非常适合在非Web容器的场景下应用,相比于传统的Tomcat,Jetty等Web容器,更加的轻量和小巧、灵活性和定制性也更好。
我们以文件服务器为例学习Netty的HTTP服务端入门开发,例程场景如下:

  • 文件服务器使用HTTP协议对外提供服务
  • 当客户端通过浏览器访问文件服务器时,对访问路径进行检查,检查失败返回403
  • 检查通过,以链接的方式打开当前文件目录,每个目录或者都是个超链接,可以递归访问
  • 如果是目录,可以继续递归访问它下面的目录或者文件,如果是文件并且可读,则可以在浏览器端直接打开,或者通过[目标另存为]下载
package com.shu.HttpProtocol;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

/**
 * @Author shu
 * @Version 1.0
 * @Date: 2022/03/07/ 9:53
 * @Description Http文件请求
 **/

public class HttpFileServer {

    private static final String DEFAULT_URL = "/";

    public void run(final int port, final String url) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch)
                                throws Exception {
                            ch.pipeline().addLast("log",
                                    new LoggingHandler(LogLevel.DEBUG));//日志处理
                            ch.pipeline().addLast("http-decoder",
                                    new HttpRequestDecoder()); // 请求消息解码器
                            ch.pipeline().addLast("http-aggregator",
                                    new HttpObjectAggregator(65536));// 目的是将多个消息转换为单一的request或者response对象
                            ch.pipeline().addLast("http-encoder",
                                    new HttpResponseEncoder());//响应解码器
                            ch.pipeline().addLast("http-chunked",
                                    new ChunkedWriteHandler());//目的是支持异步大文件传输()
                            ch.pipeline().addLast("fileServerHandler",
                                    new HttpFileServerHandler(url));// 业务逻辑
                        }
                    });
            ChannelFuture future = b.bind("127.0.0.1", port).sync();
            System.out.println("HTTP文件目录服务器启动,网址是 : " + "http://127.0.0.1:"
                    + port + url);
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            try {
                port = Integer.parseInt(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
        }
        String url = DEFAULT_URL;
        if (args.length > 1)
            url = args[1];
        new HttpFileServer().run(port, url);
    }
}

package com.shu.HttpProtocol;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedFile;
import io.netty.util.CharsetUtil;
import javax.activation.MimetypesFileTypeMap;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.util.regex.Pattern;
import static io.netty.handler.codec.http.HttpMethod.GET;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

/**
 * @Author shu
 * @Version 1.0
 * @Date: 2022/03/07/ 9:55
 * @Description 文件请求自定义处理器
 **/

public class HttpFileServerHandler extends
        SimpleChannelInboundHandler<FullHttpRequest> {
    private final String url;

    public HttpFileServerHandler(String url) {
        this.url = url;
    }


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        /*如果无法解码400*/
        if (!request.decoderResult().isSuccess()) {
            sendError(ctx, BAD_REQUEST);
            return;
        }

        /*只支持GET方法*/
        if (request.method() != GET) {
            sendError(ctx, METHOD_NOT_ALLOWED);
            return;
        }

        final String uri = request.uri();
        /*格式化URL,并且获取路径*/
        final String path = sanitizeUri(uri);
        if (path == null) {
            sendError(ctx, FORBIDDEN);
            return;
        }
        File file = new File(path);
        /*如果文件不可访问或者文件不存在*/
        if (file.isHidden() || !file.exists()) {
            sendError(ctx, NOT_FOUND);
            return;
        }
        /*如果是目录*/
        if (file.isDirectory()) {
            //1. 以/结尾就列出所有文件
            if (uri.endsWith("/")) {
                sendListing(ctx, file);
            } else {
                //2. 否则自动+/
                sendRedirect(ctx, uri + '/');
            }
            return;
        }
        if (!file.isFile()) {
            sendError(ctx, FORBIDDEN);
            return;
        }
        RandomAccessFile randomAccessFile = null;
        try {
            randomAccessFile = new RandomAccessFile(file, "r");// 以只读的方式打开文件
        } catch (FileNotFoundException fnfe) {
            sendError(ctx, NOT_FOUND);
            return;
        }
        long fileLength = randomAccessFile.length();
        //创建一个默认的HTTP响应
        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
        //设置Content Length
        HttpUtil.setContentLength(response, fileLength);
        //设置Content Type
        setContentTypeHeader(response, file);
        //如果request中有KEEP ALIVE信息
        if (HttpUtil.isKeepAlive(request)) {
            response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        }
        ctx.write(response);
        ChannelFuture sendFileFuture;
        //通过Netty的ChunkedFile对象直接将文件写入发送到缓冲区中
        sendFileFuture = ctx.write(new ChunkedFile(randomAccessFile, 0,
                fileLength, 8192), ctx.newProgressivePromise());
        sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
            @Override
            public void operationProgressed(ChannelProgressiveFuture future,
                                            long progress, long total) {
                if (total < 0) { // total unknown
                    System.err.println("Transfer progress: " + progress);
                } else {
                    System.err.println("Transfer progress: " + progress + " / "
                            + total);
                }
            }

            @Override
            public void operationComplete(ChannelProgressiveFuture future)
                    throws Exception {
                System.out.println("Transfer complete.");
            }
        });
        ChannelFuture lastContentFuture = ctx
                .writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
        //如果不支持keep-Alive,服务器端主动关闭请求
        if (!HttpUtil.isKeepAlive(request)) {
            lastContentFuture.addListener(ChannelFutureListener.CLOSE);
        }
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        if (ctx.channel().isActive()) {
            sendError(ctx, INTERNAL_SERVER_ERROR);
        }
    }

    private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&\"].*");


    private String sanitizeUri(String uri) {
        try {
            uri = URLDecoder.decode(uri, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            try {
                uri = URLDecoder.decode(uri, "ISO-8859-1");
            } catch (UnsupportedEncodingException e1) {
                throw new Error();
            }
        }
        if (!uri.startsWith(url)) {
            return null;
        }
        if (!uri.startsWith("/")) {
            return null;
        }
        uri = uri.replace('/', File.separatorChar);
        if (uri.contains(File.separator + '.')
                || uri.contains('.' + File.separator) || uri.startsWith(".")
                || uri.endsWith(".") || INSECURE_URI.matcher(uri).matches()) {
            return null;
        }
        return System.getProperty("user.dir") + File.separator + uri;
    }

    private static final Pattern ALLOWED_FILE_NAME = Pattern
            .compile("[A-Za-z0-9][-_A-Za-z0-9\\.]*");

    private static void sendListing(ChannelHandlerContext ctx, File dir) {
        FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK);
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");
        StringBuilder buf = new StringBuilder();
        String dirPath = dir.getPath();
        buf.append("<!DOCTYPE html>\r\n");
        buf.append("<html><head><title>");
        buf.append(dirPath);
        buf.append(" 目录:");
        buf.append("</title></head><body>\r\n");
        buf.append("<h3>");
        buf.append(dirPath).append(" 目录:");
        buf.append("</h3>\r\n");
        buf.append("<ul>");
        buf.append("<li>链接:<a href=\"../\">..</a></li>\r\n");
        for (File f : dir.listFiles()) {
            if (f.isHidden() || !f.canRead()) {
                continue;
            }
            String name = f.getName();
            if (!ALLOWED_FILE_NAME.matcher(name).matches()) {
                continue;
            }
            buf.append("<li>链接:<a href=\"");
            buf.append(name);
            buf.append("\">");
            buf.append(name);
            buf.append("</a></li>\r\n");
        }
        buf.append("</ul></body></html>\r\n");
        ByteBuf buffer = Unpooled.copiedBuffer(buf, CharsetUtil.UTF_8);
        response.content().writeBytes(buffer);
        buffer.release();
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    private static void sendRedirect(ChannelHandlerContext ctx, String newUri) {
        FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, FOUND);
        response.headers().set(HttpHeaderNames.LOCATION, newUri);
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    private static void sendError(ChannelHandlerContext ctx,
                                  HttpResponseStatus status) {
        FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1,
                status, Unpooled.copiedBuffer("Failure: " + status.toString()
                + "\r\n", CharsetUtil.UTF_8));
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    private static void setContentTypeHeader(HttpResponse response, File file) {
        MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();
        response.headers().set(HttpHeaderNames.CONTENT_TYPE,
                mimeTypesMap.getContentType(file.getPath()));
    }
}

image.png

  • HttpServerCodec:作为服务器,使用 HttpServerCodec 作为编码器与解码器
  • HttpObjectAggregator:它的作用是将多个消息转换为单一的FullHttpRequest或者FullHttpResponse,原因是HTTP解码器在每个HTTP消息中会生成多个消息对象。
  • ChunkedWriteHandler:它的主要作用是支持异步发送大的码流(例如大的文件传输),但不占用过多的内存,防止发生Java内存溢出错误。

3.1.5 Netty HTTP+Json协议栈开发

image.png
image.png

  • 实体类
package com.shu.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @Author shu
 * @Version 1.0
 * @Date: 2022/03/07/ 20:41
 * @Description 地址
 **/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Address {
    private String street; // 街道
    private String city; // 城市
    private String province; //省份
    private String postcode; //邮编
    private String state; // 国家
}


package com.shu.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @Author shu
 * @Version 1.0
 * @Date: 2022/03/07/ 20:58
 * @Description 顾客
 **/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Customer {
    private int id;
    private String userName;
    private String phone;
}


package com.shu.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @Author shu
 * @Version 1.0
 * @Date: 2022/03/07/ 20:56
 * @Description 订单
 **/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
    private int count;
    private Customer customer;
    private Address address;
    private Shipping shipping;
    private Float countPrice;
}

package com.shu.pojo;

/**
 * @Author shu
 * @Version 1.0
 * @Date: 2022/03/07/ 20:52
 * @Description 寄送方式
 **/
public enum Shipping {

    A(0, "普通邮寄"),
    B(100, "宅急送"),
    C(-100, "国际快递"),
    D(200, "国内快递");

    private int code;
    private String desc;

    Shipping(int code, String desc) {
        this.code = code;
        this.desc = desc;
    }
}

  • 抽象 Http Json 编码器
package com.shu.codec;

import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

/**
 * @Author shu
 * @Version 1.0
 * @Date: 2022/03/07/ 21:04
 * @Description 抽象 Http Json 编码器
 **/
@Slf4j
public abstract class AbstractHttpJsonEncoder<T>  extends MessageToMessageEncoder<T> {

    final static Charset UTF_8 = StandardCharsets.UTF_8;
    //编码
    protected ByteBuf encode0(ChannelHandlerContext ctx, Object body) {
        log.info("开始编码:把消息转换成成ByteBuf");
        //将消息转换为JSON
        String jsonStr = JSON.toJSONString(body);
        // 转换成ByteBuf
        return  Unpooled.copiedBuffer(jsonStr, UTF_8);
    }
}
  • 抽象 Http Json 解码器
package com.shu.codec;

import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

/**
 * @Author shu
 * @Version 1.0
 * @Date: 2022/03/07/ 21:15
 * @Description 抽象 Http Json 解码器
 **/
public abstract class AbstractHttpJsonDecoder<T> extends MessageToMessageDecoder<T> {

    private final Class<?> clazz;
    private final boolean isPrint;
    private final static Charset UTF_8 = StandardCharsets.UTF_8;


    protected AbstractHttpJsonDecoder(Class<?> clazz) {
        this(clazz, false);
    }

    protected AbstractHttpJsonDecoder(Class<?> clazz, boolean isPrint) {
        this.clazz = clazz;
        this.isPrint = isPrint;
    }

    // 解码
    protected Object decode0(ChannelHandlerContext ctx, ByteBuf body) {
        String content = body.toString(UTF_8);

        if (isPrint)
            System.out.println("The body is : " + content);

        Object result = JSON.parseObject(content, clazz);
        return result;
    }


}
  • 请求实体类
package com.shu.codec;

import io.netty.handler.codec.http.FullHttpRequest;

/**
 * @Author shu
 * @Version 1.0
 * @Date: 2022/03/07/ 21:02
 * @Description Http Json 请求
 **/
public class HttpJsonRequest {
    // 完整的Http请求
    private FullHttpRequest request;
    // 消息体
    private Object body;

    public HttpJsonRequest(FullHttpRequest request, Object body) {
        this.request = request;
        this.body = body;
    }

    /**
     * @return the request
     */
    public final FullHttpRequest getRequest() {
        return request;
    }

    /**
     * @param request the request to set
     */
    public final void setRequest(FullHttpRequest request) {
        this.request = request;
    }

    /**
     * @return the object
     */
    public final Object getBody() {
        return body;
    }

    /**
     * @param object the object to set
     */
    public final void setBody(Object body) {
        this.body = body;
    }

    /*
     * (non-Javadoc)
     *
     * @see java.lang.Object#toString()
     */
    @Override
    public String toString() {
        return "HttpJsonRequest [request=" + request + ", body =" + body + "]";
    }
}

  • 响应实体类
package com.shu.codec;

import io.netty.handler.codec.http.FullHttpResponse;

/**
 * @Author shu
 * @Version 1.0
 * @Date: 2022/03/07/ 21:03
 * @Description Http Json 响应
 **/
public class HttpJsonResponse {
    private FullHttpResponse httpResponse;
    private Object result;

    public HttpJsonResponse(FullHttpResponse httpResponse, Object result) {
        this.httpResponse = httpResponse;
        this.result = result;
    }

    /**
     * @return the httpResponse
     */
    public final FullHttpResponse getHttpResponse() {
        return httpResponse;
    }

    /**
     * @param httpResponse the httpResponse to set
     */
    public final void setHttpResponse(FullHttpResponse httpResponse) {
        this.httpResponse = httpResponse;
    }

    /**
     * @return the body
     */
    public final Object getResult() {
        return result;
    }

    /**
     * @param body the body to set
     */
    public final void setResult(Object result) {
        this.result = result;
    }

    /*
     * (non-Javadoc)
     *
     * @see java.lang.Object#toString()
     */
    @Override
    public String toString() {
        return "HttpJsonResponse [httpResponse=" + httpResponse + ", result="
                + result + "]";
    }
}
  • 请求编解码
package com.shu.codec;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.CharsetUtil;

import java.util.List;

import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static io.netty.handler.codec.stomp.StompHeaders.CONTENT_TYPE;

/**
 * @Author shu
 * @Version 1.0
 * @Date: 2022/03/07/ 21:17
 * @Description Http Json 请求解码器
 **/
public class HttpJsonRequestDecoder extends AbstractHttpJsonDecoder<FullHttpRequest> {

    public HttpJsonRequestDecoder(Class<?> clazz) {
        this(clazz, false);
    }


    /**
     * 构造器
     *
     * @param clazz   解码的对象信息
     * @param isPrint 是否需要打印
     */
    public HttpJsonRequestDecoder(Class<?> clazz, boolean isPrint) {
        super(clazz, isPrint);
    }

    /**
     * @param ctx channel上下文
     * @param msg 消息
     * @param out 输出集合
     * @throws Exception
     */
    @Override
    protected void decode(ChannelHandlerContext ctx, FullHttpRequest msg, List<Object> out) throws Exception {
        if (!msg.decoderResult().isSuccess()) {
            sendError(ctx, HttpResponseStatus.BAD_REQUEST);
            return;
        }
        HttpJsonRequest request = new HttpJsonRequest(msg, decode0(ctx, msg.content()));
        out.add(request);
    }


    /**
     * 测试的话,直接封装,实战中需要更健壮的处理
     */
    private static void sendError(ChannelHandlerContext ctx,
                                  HttpResponseStatus status) {
        FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1,
                status, Unpooled.copiedBuffer("Failure: " + status.toString()
                + "\r\n", CharsetUtil.UTF_8));
        response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }
}
package com.shu.codec;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.*;

import java.net.InetAddress;
import java.util.List;

/**
 * @Author shu
 * @Version 1.0
 * @Date: 2022/03/07/ 21:19
 * @Description Http Json 请求编码器
 **/
public class HttpJsonRequestEncoder extends AbstractHttpJsonEncoder<HttpJsonRequest> {
    /**
     * 把请求消息封装成Http报文->请求行(request line)、请求头部(header)、空行和请求数据
     * @param ctx
     * @param msg
     * @param out
     * @throws Exception
     */
    @Override
    protected void encode(ChannelHandlerContext ctx, HttpJsonRequest msg, List<Object> out) throws Exception {
        //(1)调用父类的encode0,将业务需要发送的对象转换为Json
        ByteBuf body = encode0(ctx, msg.getBody());
        //(2) 如果业务自定义了HTTP消息头,则使用业务的消息头,否则在这里构造HTTP消息头
        // 这里使用硬编码的方式来写消息头,实际中可以写入配置文件
        FullHttpRequest request = msg.getRequest();
        if (request == null) {
            request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
                    HttpMethod.GET, "/do", body);
            HttpHeaders headers = request.headers();
            headers.set(HttpHeaderNames.HOST, InetAddress.getLocalHost()
                    .getHostAddress());
            headers.set(HttpHeaderNames.CONNECTION, HttpHeaders.Values.CLOSE);
            headers.set(HttpHeaderNames.ACCEPT_ENCODING,
                    HttpHeaderValues.GZIP.toString() + ','
                            + HttpHeaderValues.DEFLATE.toString());
            headers.set(HttpHeaderNames.ACCEPT_CHARSET,
                    "ISO-8859-1,utf-8;q=0.7,*;q=0.7");
            headers.set(HttpHeaderNames.ACCEPT_LANGUAGE, "zh");
            headers.set(HttpHeaderNames.USER_AGENT,
                    "Netty json Http Client side");
            headers.set(HttpHeaderNames.ACCEPT,
                    "text/html,application/json;q=0.9,*/*;q=0.8");
        }
        HttpUtil.setContentLength(request, body.readableBytes());
        // (3) 编码后的对象
        out.add(request);
    }
}
  • 响应编解码
package com.shu.codec;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpResponse;

import java.util.List;

/**
 * @Author shu
 * @Version 1.0
 * @Date: 2022/03/07/ 21:19
 * @Description Http Json 响应解码器
 **/
public class HttpJsonResponseDecoder extends AbstractHttpJsonDecoder<FullHttpResponse> {

    public HttpJsonResponseDecoder(Class<?> clazz) {
        this(clazz, false);
    }

    /**
     * 构造器
     *
     * @param clazz   解码的对象信息
     * @param isPrint 是否需要打印
     */
    public HttpJsonResponseDecoder(Class<?> clazz, boolean isPrint) {
        super(clazz, isPrint);
    }

    /**
     * @param ctx channel上下文
     * @param msg 消息
     * @param out 输出集合
     * @throws Exception
     */
    @Override
    protected void decode(ChannelHandlerContext ctx, FullHttpResponse msg, List<Object> out) throws Exception {
        System.out.println("开始解码...");
        out.add(
                new HttpJsonResponse(msg, decode0(ctx, msg.content()))
        );
    }


}
package com.shu.codec;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpUtil;

import java.util.List;

import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static io.netty.handler.codec.stomp.StompHeaders.CONTENT_TYPE;

/**
 * @Author shu
 * @Version 1.0
 * @Date: 2022/03/07/ 21:18
 * @Description Http Json 响应编码器 编码成为HTTP响应报文 状态行、消息报头、空行和响应正文。
 **/
public class HttpJsonResponseEncoder extends AbstractHttpJsonEncoder<HttpJsonResponse> {
    @Override
    protected void encode(ChannelHandlerContext ctx, HttpJsonResponse msg, List<Object> out) throws Exception {
        //编码
        ByteBuf body = encode0(ctx, msg.getResult());
        FullHttpResponse response = msg.getHttpResponse();
        if (response == null) {
            response = new DefaultFullHttpResponse(HTTP_1_1, OK, body);
        } else {
            response = new DefaultFullHttpResponse(msg.getHttpResponse()
                    .protocolVersion(), msg.getHttpResponse().status(),
                    body);
        }
        response.headers().set(CONTENT_TYPE, "text/json");
        HttpUtil.setContentLength(response, body.readableBytes());
        out.add(response);
    }

}
  • 服务端
package com.shu.server;

import com.shu.codec.HttpJsonRequestDecoder;
import com.shu.codec.HttpJsonResponseEncoder;
import com.shu.pojo.Order;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

import java.net.InetSocketAddress;

/**
 * @Author shu
 * @Version 1.0
 * @Date: 2022/03/07/ 21:20
 * @Description
 **/
public class HttpJsonServer {
    public void run(final int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch)
                                throws Exception {
                            //接收HttpJsonRequest,需要对应解码器
                            //ByteBuf->FullHttpRequest-> HttpJsonRequestDecoder
                            //输出HttpJsonResponse,需要对应编码器
                            //HttpResponseEncoder->FullHttpResponse-> HttpJsonResponseEncoder
                            ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                            ch.pipeline().addLast("http-decoder", new HttpRequestDecoder());
                            ch.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536));
                            ch.pipeline().addLast("json-decoder", new HttpJsonRequestDecoder(Order.class, true));
                            ch.pipeline().addLast("http-encoder", new HttpResponseEncoder());
                            ch.pipeline().addLast("json-encoder", new HttpJsonResponseEncoder());
                            ch.pipeline().addLast("jsonServerHandler", new HttpJsonServerHandler());
                        }
                    });
            ChannelFuture future = b.bind(new InetSocketAddress(port)).sync();
            System.out.println("HTTP订购服务器启动,网址是 : " + "http://localhost:"
                    + port);
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            try {
                port = Integer.parseInt(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
        }
        new HttpJsonServer().run(port);
    }
}
package com.shu.server;

import com.shu.codec.HttpJsonRequest;
import com.shu.codec.HttpJsonResponse;
import com.shu.pojo.Address;
import com.shu.pojo.Order;
import com.shu.pojo.Shipping;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

import java.util.ArrayList;
import java.util.List;

import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static io.netty.handler.codec.rtsp.RtspResponseStatuses.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.stomp.StompHeaders.CONTENT_TYPE;

/**
 * @Author shu
 * @Version 1.0
 * @Date: 2022/03/07/ 21:20
 * @Description
 **/
public class HttpJsonServerHandler extends SimpleChannelInboundHandler<HttpJsonRequest> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpJsonRequest msg) throws Exception {
        HttpRequest request = msg.getRequest();
        Order order = (Order) msg.getBody();
        System.out.println("Http server receive request : " + order);
        // 返回客服端的消息
        ChannelFuture future = ctx.writeAndFlush(new HttpJsonResponse(null, order));
        if (!HttpUtil.isKeepAlive(request)) {
            future.addListener(new GenericFutureListener<Future<? super Void>>() {
                public void operationComplete(Future future) throws Exception {
                    ctx.close();
                }
            });
        }
    }



    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        if (ctx.channel().isActive()) {
            sendError(ctx, INTERNAL_SERVER_ERROR);
        }
    }


    private static void sendError(ChannelHandlerContext ctx,
                                  HttpResponseStatus status) {
        FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1,
                status, Unpooled.copiedBuffer("失败: " + status.toString()
                + "\r\n", CharsetUtil.UTF_8));
        response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }
}
  • 客服端
package com.shu.client;

import com.shu.codec.HttpJsonRequestEncoder;
import com.shu.codec.HttpJsonResponseDecoder;
import com.shu.pojo.Order;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

import java.net.InetSocketAddress;

/**
 * @Author shu
 * @Version 1.0
 * @Date: 2022/03/07/ 21:22
 * @Description Http客户端
 **/
public class HttpJsonClient {
    public void connect(int port) throws Exception {
        // 配置客户端NIO线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                            ch.pipeline().addLast("http-decoder",
                                    new HttpResponseDecoder());
                            ch.pipeline().addLast("http-aggregator",
                                    new HttpObjectAggregator(65536));
                            // json解码器
                            ch.pipeline().addLast("json-decoder", new HttpJsonResponseDecoder(Order.class, true));
                            ch.pipeline().addLast("http-encoder",
                                    new HttpRequestEncoder());
                            ch.pipeline().addLast("json-encoder",
                                    new HttpJsonRequestEncoder());
                            ch.pipeline().addLast("jsonClientHandler",
                                    new HttpJsonClientHandler());
                        }
                    });

            // 发起异步连接操作
            ChannelFuture f = b.connect(new InetSocketAddress(port)).sync();

            // 当代客户端链路关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放NIO线程组
            group.shutdownGracefully();
        }
    }

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 采用默认值
            }
        }
        new HttpJsonClient().connect(port);
    }
}

package com.shu.client;

import com.shu.codec.HttpJsonRequest;
import com.shu.pojo.Address;
import com.shu.pojo.Customer;
import com.shu.pojo.Order;
import com.shu.pojo.Shipping;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @Author shu
 * @Version 1.0
 * @Date: 2022/03/07/ 21:23
 * @Description 自定义处理器
 **/
public class HttpJsonClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 连接刚建立
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("连接上服务器...");
        // 向服务器发送数据
        HttpJsonRequest request = new HttpJsonRequest(null, new Order(2, new Customer(1,"ADMIN","123456"),new Address("四川","四川","四川","四川","四川"), Shipping.A,125f));
        // 写入数据
        ctx.writeAndFlush(request);
    }

    /**
     * 读取服务器返回的数据
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(msg.getClass().getName());
        System.out.println("接收到了数据..." + msg);
    }



    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

  • 观察结果

首先客服端发送请求报文
image.png
服务端收到请求报文
image.png
服务端返回响应
image.png
客户端接受响应
image.png

举报

相关推荐

0 条评论