文章目录
3.1 Http协议开发
3.1.1 基本知识
- HTTP(超文本传输协议)协议是建立在TCP传输协议之上的应用层协议,它的发展是万维网协会和Internet工作小组IETF合作的结果。HTTP是一个属于应用层的面向对象的协议,由于其简捷、快速的方式,适用于分布式超媒体信息系统。它于1990年提出,经过多年的使用和发展,得到了不断地完善和扩展。
- 由于 HTTP协议是目前Web开发的主流协议,基于 HTTP的应用非常广泛,因此,掌握HTTP的开发非常重要。
- HTTP是一个属于应用层的面向对象的协议,由于其简捷、快速的方式,适用于分布式超媒体信息系统。
- HTTP协议的主要特点如下。支持Client/Server模式。
- 简单——客户向服务器请求服务时,只需指定服务URL,携带必要的请求参数或中的者消息体。
- 灵活——HTTP允许传输任意类型的数据对象,传输的内容类型由 HTTP消息头中的Content-Type加以标记。
- 无状态——HTTP协议是无状态协议,无状态是指协议对于事务处理没有记忆能力。缺少状态意味着如果后续处理需要之前的信息,则它必须重传,这样可能导致每次连接传送的数据量增大。另一方面,在服务器不需要先前信息时它的应答就较快,负载较轻。
3.1.2 HTTP请求消息(HttpRequest)
- 客户端发送一个HTTP请求到服务器的请求消息包括以下格式:请求行(request line)、请求头部(header)、空行和请求数据四个部分组成,下图给出了请求报文的一般格式。
GET /hello.txt HTTP/1.1
User-Agent: curl/7.16.3 libcurl/7.16.3 OpenSSL/0.9.7l zlib/1.2.3
Host: www.example.com
Accept-Language: en, mi
请求方法
根据HTTP标准,HTTP请求可以使用多种请求方法。
HTTP1.0定义了三种请求方法: GET, POST 和 HEAD方法。
HTTP1.1新增了五种请求方法:OPTIONS, PUT, DELETE, TRACE 和 CONNECT 方法。
序号 | 方法 | 描述 |
---|---|---|
1 | GET | 请求指定的页面信息,并返回实体主体。 |
2 | HEAD | 类似于get请求,只不过返回的响应中没有具体的内容,用于获取报头 |
3 | POST | 向指定资源提交数据进行处理请求(例如提交表单或者上传文件)。数据被包含在请求体中。POST请求可能会导致新的资源的建立和/或已有资源的修改。 |
4 | PUT | 从客户端向服务器传送的数据取代指定的文档的内容。 |
5 | DELETE | 请求服务器删除指定的页面。 |
6 | CONNECT | HTTP/1.1协议中预留给能够将连接改为管道方式的代理服务器。 |
7 | OPTIONS | 允许客户端查看服务器的性能。 |
8 | TRACE | 回显服务器收到的请求,主要用于测试或诊断。 |
消息头
3.1.3 HTTP响应消息(HttpResponse)
- HTTP响应也由四个部分组成,分别是:状态行、消息报头、空行和响应正文。
状态码
当浏览者访问一个网页时,浏览者的浏览器会向网页所在服务器发出请求。当浏览器接收并显示网页前,此网页所在的服务器会返回一个包含HTTP状态码的信息头(server header)用以响应浏览器的请求。
HTTP状态码的英文为HTTP Status Code。
下面是常见的HTTP状态码:
- 200 - 请求成功
- 301 - 资源(网页等)被永久转移到其它URL
- 404 - 请求的资源(网页等)不存在
- 500 - 内部服务器错误
响应头消息
Header | 解释 | 示例 |
---|---|---|
Accept-Ranges | 表明服务器是否支持指定范围请求及哪种类型的分段请求 | Accept-Ranges: bytes |
Age | 从原始服务器到代理缓存形成的估算时间(以秒计,非负) | Age: 12 |
Allow | 对某网络资源的有效的请求行为,不允许则返回405 | Allow: GET, HEAD |
Cache-Control | 告诉所有的缓存机制是否可以缓存及哪种类型 | Cache-Control: no-cache |
Content-Encoding | web服务器支持的返回内容压缩编码类型。 | Content-Encoding: gzip |
Content-Language | 响应体的语言 | Content-Language: en,zh |
Content-Length | 响应体的长度 | Content-Length: 348 |
Content-Location | 请求资源可替代的备用的另一地址 | Content-Location: /index.htm |
Content-MD5 | 返回资源的MD5校验值 | Content-MD5: Q2hlY2sgSW50ZWdyaXR5IQ== |
Content-Range | 在整个返回体中本部分的字节位置 | Content-Range: bytes 21010-47021/47022 |
Content-Type | 返回内容的MIME类型 | Content-Type: text/html; charset=utf-8 |
Date | 原始服务器消息发出的时间 | Date: Tue, 15 Nov 2010 08:12:31 GMT |
ETag | 请求变量的实体标签的当前值 | ETag: “737060cd8c284d8af7ad3082f209582d” |
Expires | 响应过期的日期和时间 | Expires: Thu, 01 Dec 2010 16:00:00 GMT |
Last-Modified | 请求资源的最后修改时间 | Last-Modified: Tue, 15 Nov 2010 12:45:26 GMT |
Location | 用来重定向接收方到非请求URL的位置来完成请求或标识新的资源 | Location: http://www.zcmhi.com/archives/94.html |
Pragma | 包括实现特定的指令,它可应用到响应链上的任何接收方 | Pragma: no-cache |
Proxy-Authenticate | 它指出认证方案和可应用到代理的该URL上的参数 | Proxy-Authenticate: Basic |
refresh | 应用于重定向或一个新的资源被创造,在5秒之后重定向(由网景提出,被大部分浏览器支持) |
Refresh: 5; url=
http://www.zcmhi.com/archives/94.html |
| Retry-After | 如果实体暂时不可取,通知客户端在指定时间之后再次尝试 | Retry-After: 120 |
| Server | web服务器软件名称 | Server: Apache/1.3.27 (Unix) (Red-Hat/Linux) |
| Set-Cookie | 设置Http Cookie | Set-Cookie: UserID=JohnDoe; Max-Age=3600; Version=1 |
| Trailer | 指出头域在分块传输编码的尾部存在 | Trailer: Max-Forwards |
| Transfer-Encoding | 文件传输编码 | Transfer-Encoding:chunked |
| Vary | 告诉下游代理是使用缓存响应还是从原始服务器请求 | Vary: * |
| Via | 告知代理客户端响应是通过哪里发送的 | Via: 1.0 fred, 1.1 nowhere.com (Apache/1.1) |
| Warning | 警告实体可能存在的问题 | Warning: 199 Miscellaneous warning |
| WWW-Authenticate | 表明客户端请求实体应该使用的授权方案 | WWW-Authenticate: Basic |
3.1.4 Http开发
netty天生异步事件驱动的架构,无论是在性能上还是在可靠性上,都表现优异,非常适合在非Web容器的场景下应用,相比于传统的Tomcat,Jetty等Web容器,更加的轻量和小巧、灵活性和定制性也更好。
我们以文件服务器为例学习Netty的HTTP服务端入门开发,例程场景如下:
- 文件服务器使用HTTP协议对外提供服务
- 当客户端通过浏览器访问文件服务器时,对访问路径进行检查,检查失败返回403
- 检查通过,以链接的方式打开当前文件目录,每个目录或者都是个超链接,可以递归访问
- 如果是目录,可以继续递归访问它下面的目录或者文件,如果是文件并且可读,则可以在浏览器端直接打开,或者通过[目标另存为]下载
package com.shu.HttpProtocol;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
/**
* @Author shu
* @Version 1.0
* @Date: 2022/03/07/ 9:53
* @Description Http文件请求
**/
public class HttpFileServer {
private static final String DEFAULT_URL = "/";
public void run(final int port, final String url) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast("log",
new LoggingHandler(LogLevel.DEBUG));//日志处理
ch.pipeline().addLast("http-decoder",
new HttpRequestDecoder()); // 请求消息解码器
ch.pipeline().addLast("http-aggregator",
new HttpObjectAggregator(65536));// 目的是将多个消息转换为单一的request或者response对象
ch.pipeline().addLast("http-encoder",
new HttpResponseEncoder());//响应解码器
ch.pipeline().addLast("http-chunked",
new ChunkedWriteHandler());//目的是支持异步大文件传输()
ch.pipeline().addLast("fileServerHandler",
new HttpFileServerHandler(url));// 业务逻辑
}
});
ChannelFuture future = b.bind("127.0.0.1", port).sync();
System.out.println("HTTP文件目录服务器启动,网址是 : " + "http://127.0.0.1:"
+ port + url);
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
try {
port = Integer.parseInt(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
String url = DEFAULT_URL;
if (args.length > 1)
url = args[1];
new HttpFileServer().run(port, url);
}
}
package com.shu.HttpProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedFile;
import io.netty.util.CharsetUtil;
import javax.activation.MimetypesFileTypeMap;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.util.regex.Pattern;
import static io.netty.handler.codec.http.HttpMethod.GET;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
/**
* @Author shu
* @Version 1.0
* @Date: 2022/03/07/ 9:55
* @Description 文件请求自定义处理器
**/
public class HttpFileServerHandler extends
SimpleChannelInboundHandler<FullHttpRequest> {
private final String url;
public HttpFileServerHandler(String url) {
this.url = url;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
/*如果无法解码400*/
if (!request.decoderResult().isSuccess()) {
sendError(ctx, BAD_REQUEST);
return;
}
/*只支持GET方法*/
if (request.method() != GET) {
sendError(ctx, METHOD_NOT_ALLOWED);
return;
}
final String uri = request.uri();
/*格式化URL,并且获取路径*/
final String path = sanitizeUri(uri);
if (path == null) {
sendError(ctx, FORBIDDEN);
return;
}
File file = new File(path);
/*如果文件不可访问或者文件不存在*/
if (file.isHidden() || !file.exists()) {
sendError(ctx, NOT_FOUND);
return;
}
/*如果是目录*/
if (file.isDirectory()) {
//1. 以/结尾就列出所有文件
if (uri.endsWith("/")) {
sendListing(ctx, file);
} else {
//2. 否则自动+/
sendRedirect(ctx, uri + '/');
}
return;
}
if (!file.isFile()) {
sendError(ctx, FORBIDDEN);
return;
}
RandomAccessFile randomAccessFile = null;
try {
randomAccessFile = new RandomAccessFile(file, "r");// 以只读的方式打开文件
} catch (FileNotFoundException fnfe) {
sendError(ctx, NOT_FOUND);
return;
}
long fileLength = randomAccessFile.length();
//创建一个默认的HTTP响应
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
//设置Content Length
HttpUtil.setContentLength(response, fileLength);
//设置Content Type
setContentTypeHeader(response, file);
//如果request中有KEEP ALIVE信息
if (HttpUtil.isKeepAlive(request)) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
ctx.write(response);
ChannelFuture sendFileFuture;
//通过Netty的ChunkedFile对象直接将文件写入发送到缓冲区中
sendFileFuture = ctx.write(new ChunkedFile(randomAccessFile, 0,
fileLength, 8192), ctx.newProgressivePromise());
sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
@Override
public void operationProgressed(ChannelProgressiveFuture future,
long progress, long total) {
if (total < 0) { // total unknown
System.err.println("Transfer progress: " + progress);
} else {
System.err.println("Transfer progress: " + progress + " / "
+ total);
}
}
@Override
public void operationComplete(ChannelProgressiveFuture future)
throws Exception {
System.out.println("Transfer complete.");
}
});
ChannelFuture lastContentFuture = ctx
.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
//如果不支持keep-Alive,服务器端主动关闭请求
if (!HttpUtil.isKeepAlive(request)) {
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
if (ctx.channel().isActive()) {
sendError(ctx, INTERNAL_SERVER_ERROR);
}
}
private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&\"].*");
private String sanitizeUri(String uri) {
try {
uri = URLDecoder.decode(uri, "UTF-8");
} catch (UnsupportedEncodingException e) {
try {
uri = URLDecoder.decode(uri, "ISO-8859-1");
} catch (UnsupportedEncodingException e1) {
throw new Error();
}
}
if (!uri.startsWith(url)) {
return null;
}
if (!uri.startsWith("/")) {
return null;
}
uri = uri.replace('/', File.separatorChar);
if (uri.contains(File.separator + '.')
|| uri.contains('.' + File.separator) || uri.startsWith(".")
|| uri.endsWith(".") || INSECURE_URI.matcher(uri).matches()) {
return null;
}
return System.getProperty("user.dir") + File.separator + uri;
}
private static final Pattern ALLOWED_FILE_NAME = Pattern
.compile("[A-Za-z0-9][-_A-Za-z0-9\\.]*");
private static void sendListing(ChannelHandlerContext ctx, File dir) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");
StringBuilder buf = new StringBuilder();
String dirPath = dir.getPath();
buf.append("<!DOCTYPE html>\r\n");
buf.append("<html><head><title>");
buf.append(dirPath);
buf.append(" 目录:");
buf.append("</title></head><body>\r\n");
buf.append("<h3>");
buf.append(dirPath).append(" 目录:");
buf.append("</h3>\r\n");
buf.append("<ul>");
buf.append("<li>链接:<a href=\"../\">..</a></li>\r\n");
for (File f : dir.listFiles()) {
if (f.isHidden() || !f.canRead()) {
continue;
}
String name = f.getName();
if (!ALLOWED_FILE_NAME.matcher(name).matches()) {
continue;
}
buf.append("<li>链接:<a href=\"");
buf.append(name);
buf.append("\">");
buf.append(name);
buf.append("</a></li>\r\n");
}
buf.append("</ul></body></html>\r\n");
ByteBuf buffer = Unpooled.copiedBuffer(buf, CharsetUtil.UTF_8);
response.content().writeBytes(buffer);
buffer.release();
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
private static void sendRedirect(ChannelHandlerContext ctx, String newUri) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, FOUND);
response.headers().set(HttpHeaderNames.LOCATION, newUri);
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
private static void sendError(ChannelHandlerContext ctx,
HttpResponseStatus status) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1,
status, Unpooled.copiedBuffer("Failure: " + status.toString()
+ "\r\n", CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
private static void setContentTypeHeader(HttpResponse response, File file) {
MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();
response.headers().set(HttpHeaderNames.CONTENT_TYPE,
mimeTypesMap.getContentType(file.getPath()));
}
}
- HttpServerCodec:作为服务器,使用 HttpServerCodec 作为编码器与解码器
- HttpObjectAggregator:它的作用是将多个消息转换为单一的FullHttpRequest或者FullHttpResponse,原因是HTTP解码器在每个HTTP消息中会生成多个消息对象。
- ChunkedWriteHandler:它的主要作用是支持异步发送大的码流(例如大的文件传输),但不占用过多的内存,防止发生Java内存溢出错误。
3.1.5 Netty HTTP+Json协议栈开发
- 实体类
package com.shu.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @Author shu
* @Version 1.0
* @Date: 2022/03/07/ 20:41
* @Description 地址
**/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Address {
private String street; // 街道
private String city; // 城市
private String province; //省份
private String postcode; //邮编
private String state; // 国家
}
package com.shu.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @Author shu
* @Version 1.0
* @Date: 2022/03/07/ 20:58
* @Description 顾客
**/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Customer {
private int id;
private String userName;
private String phone;
}
package com.shu.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @Author shu
* @Version 1.0
* @Date: 2022/03/07/ 20:56
* @Description 订单
**/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
private int count;
private Customer customer;
private Address address;
private Shipping shipping;
private Float countPrice;
}
package com.shu.pojo;
/**
* @Author shu
* @Version 1.0
* @Date: 2022/03/07/ 20:52
* @Description 寄送方式
**/
public enum Shipping {
A(0, "普通邮寄"),
B(100, "宅急送"),
C(-100, "国际快递"),
D(200, "国内快递");
private int code;
private String desc;
Shipping(int code, String desc) {
this.code = code;
this.desc = desc;
}
}
- 抽象 Http Json 编码器
package com.shu.codec;
import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
/**
* @Author shu
* @Version 1.0
* @Date: 2022/03/07/ 21:04
* @Description 抽象 Http Json 编码器
**/
@Slf4j
public abstract class AbstractHttpJsonEncoder<T> extends MessageToMessageEncoder<T> {
final static Charset UTF_8 = StandardCharsets.UTF_8;
//编码
protected ByteBuf encode0(ChannelHandlerContext ctx, Object body) {
log.info("开始编码:把消息转换成成ByteBuf");
//将消息转换为JSON
String jsonStr = JSON.toJSONString(body);
// 转换成ByteBuf
return Unpooled.copiedBuffer(jsonStr, UTF_8);
}
}
- 抽象 Http Json 解码器
package com.shu.codec;
import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
/**
* @Author shu
* @Version 1.0
* @Date: 2022/03/07/ 21:15
* @Description 抽象 Http Json 解码器
**/
public abstract class AbstractHttpJsonDecoder<T> extends MessageToMessageDecoder<T> {
private final Class<?> clazz;
private final boolean isPrint;
private final static Charset UTF_8 = StandardCharsets.UTF_8;
protected AbstractHttpJsonDecoder(Class<?> clazz) {
this(clazz, false);
}
protected AbstractHttpJsonDecoder(Class<?> clazz, boolean isPrint) {
this.clazz = clazz;
this.isPrint = isPrint;
}
// 解码
protected Object decode0(ChannelHandlerContext ctx, ByteBuf body) {
String content = body.toString(UTF_8);
if (isPrint)
System.out.println("The body is : " + content);
Object result = JSON.parseObject(content, clazz);
return result;
}
}
- 请求实体类
package com.shu.codec;
import io.netty.handler.codec.http.FullHttpRequest;
/**
* @Author shu
* @Version 1.0
* @Date: 2022/03/07/ 21:02
* @Description Http Json 请求
**/
public class HttpJsonRequest {
// 完整的Http请求
private FullHttpRequest request;
// 消息体
private Object body;
public HttpJsonRequest(FullHttpRequest request, Object body) {
this.request = request;
this.body = body;
}
/**
* @return the request
*/
public final FullHttpRequest getRequest() {
return request;
}
/**
* @param request the request to set
*/
public final void setRequest(FullHttpRequest request) {
this.request = request;
}
/**
* @return the object
*/
public final Object getBody() {
return body;
}
/**
* @param object the object to set
*/
public final void setBody(Object body) {
this.body = body;
}
/*
* (non-Javadoc)
*
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return "HttpJsonRequest [request=" + request + ", body =" + body + "]";
}
}
- 响应实体类
package com.shu.codec;
import io.netty.handler.codec.http.FullHttpResponse;
/**
* @Author shu
* @Version 1.0
* @Date: 2022/03/07/ 21:03
* @Description Http Json 响应
**/
public class HttpJsonResponse {
private FullHttpResponse httpResponse;
private Object result;
public HttpJsonResponse(FullHttpResponse httpResponse, Object result) {
this.httpResponse = httpResponse;
this.result = result;
}
/**
* @return the httpResponse
*/
public final FullHttpResponse getHttpResponse() {
return httpResponse;
}
/**
* @param httpResponse the httpResponse to set
*/
public final void setHttpResponse(FullHttpResponse httpResponse) {
this.httpResponse = httpResponse;
}
/**
* @return the body
*/
public final Object getResult() {
return result;
}
/**
* @param body the body to set
*/
public final void setResult(Object result) {
this.result = result;
}
/*
* (non-Javadoc)
*
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return "HttpJsonResponse [httpResponse=" + httpResponse + ", result="
+ result + "]";
}
}
- 请求编解码
package com.shu.codec;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.CharsetUtil;
import java.util.List;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static io.netty.handler.codec.stomp.StompHeaders.CONTENT_TYPE;
/**
* @Author shu
* @Version 1.0
* @Date: 2022/03/07/ 21:17
* @Description Http Json 请求解码器
**/
public class HttpJsonRequestDecoder extends AbstractHttpJsonDecoder<FullHttpRequest> {
public HttpJsonRequestDecoder(Class<?> clazz) {
this(clazz, false);
}
/**
* 构造器
*
* @param clazz 解码的对象信息
* @param isPrint 是否需要打印
*/
public HttpJsonRequestDecoder(Class<?> clazz, boolean isPrint) {
super(clazz, isPrint);
}
/**
* @param ctx channel上下文
* @param msg 消息
* @param out 输出集合
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, FullHttpRequest msg, List<Object> out) throws Exception {
if (!msg.decoderResult().isSuccess()) {
sendError(ctx, HttpResponseStatus.BAD_REQUEST);
return;
}
HttpJsonRequest request = new HttpJsonRequest(msg, decode0(ctx, msg.content()));
out.add(request);
}
/**
* 测试的话,直接封装,实战中需要更健壮的处理
*/
private static void sendError(ChannelHandlerContext ctx,
HttpResponseStatus status) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1,
status, Unpooled.copiedBuffer("Failure: " + status.toString()
+ "\r\n", CharsetUtil.UTF_8));
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
}
package com.shu.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.*;
import java.net.InetAddress;
import java.util.List;
/**
* @Author shu
* @Version 1.0
* @Date: 2022/03/07/ 21:19
* @Description Http Json 请求编码器
**/
public class HttpJsonRequestEncoder extends AbstractHttpJsonEncoder<HttpJsonRequest> {
/**
* 把请求消息封装成Http报文->请求行(request line)、请求头部(header)、空行和请求数据
* @param ctx
* @param msg
* @param out
* @throws Exception
*/
@Override
protected void encode(ChannelHandlerContext ctx, HttpJsonRequest msg, List<Object> out) throws Exception {
//(1)调用父类的encode0,将业务需要发送的对象转换为Json
ByteBuf body = encode0(ctx, msg.getBody());
//(2) 如果业务自定义了HTTP消息头,则使用业务的消息头,否则在这里构造HTTP消息头
// 这里使用硬编码的方式来写消息头,实际中可以写入配置文件
FullHttpRequest request = msg.getRequest();
if (request == null) {
request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/do", body);
HttpHeaders headers = request.headers();
headers.set(HttpHeaderNames.HOST, InetAddress.getLocalHost()
.getHostAddress());
headers.set(HttpHeaderNames.CONNECTION, HttpHeaders.Values.CLOSE);
headers.set(HttpHeaderNames.ACCEPT_ENCODING,
HttpHeaderValues.GZIP.toString() + ','
+ HttpHeaderValues.DEFLATE.toString());
headers.set(HttpHeaderNames.ACCEPT_CHARSET,
"ISO-8859-1,utf-8;q=0.7,*;q=0.7");
headers.set(HttpHeaderNames.ACCEPT_LANGUAGE, "zh");
headers.set(HttpHeaderNames.USER_AGENT,
"Netty json Http Client side");
headers.set(HttpHeaderNames.ACCEPT,
"text/html,application/json;q=0.9,*/*;q=0.8");
}
HttpUtil.setContentLength(request, body.readableBytes());
// (3) 编码后的对象
out.add(request);
}
}
- 响应编解码
package com.shu.codec;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpResponse;
import java.util.List;
/**
* @Author shu
* @Version 1.0
* @Date: 2022/03/07/ 21:19
* @Description Http Json 响应解码器
**/
public class HttpJsonResponseDecoder extends AbstractHttpJsonDecoder<FullHttpResponse> {
public HttpJsonResponseDecoder(Class<?> clazz) {
this(clazz, false);
}
/**
* 构造器
*
* @param clazz 解码的对象信息
* @param isPrint 是否需要打印
*/
public HttpJsonResponseDecoder(Class<?> clazz, boolean isPrint) {
super(clazz, isPrint);
}
/**
* @param ctx channel上下文
* @param msg 消息
* @param out 输出集合
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, FullHttpResponse msg, List<Object> out) throws Exception {
System.out.println("开始解码...");
out.add(
new HttpJsonResponse(msg, decode0(ctx, msg.content()))
);
}
}
package com.shu.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpUtil;
import java.util.List;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static io.netty.handler.codec.stomp.StompHeaders.CONTENT_TYPE;
/**
* @Author shu
* @Version 1.0
* @Date: 2022/03/07/ 21:18
* @Description Http Json 响应编码器 编码成为HTTP响应报文 状态行、消息报头、空行和响应正文。
**/
public class HttpJsonResponseEncoder extends AbstractHttpJsonEncoder<HttpJsonResponse> {
@Override
protected void encode(ChannelHandlerContext ctx, HttpJsonResponse msg, List<Object> out) throws Exception {
//编码
ByteBuf body = encode0(ctx, msg.getResult());
FullHttpResponse response = msg.getHttpResponse();
if (response == null) {
response = new DefaultFullHttpResponse(HTTP_1_1, OK, body);
} else {
response = new DefaultFullHttpResponse(msg.getHttpResponse()
.protocolVersion(), msg.getHttpResponse().status(),
body);
}
response.headers().set(CONTENT_TYPE, "text/json");
HttpUtil.setContentLength(response, body.readableBytes());
out.add(response);
}
}
- 服务端
package com.shu.server;
import com.shu.codec.HttpJsonRequestDecoder;
import com.shu.codec.HttpJsonResponseEncoder;
import com.shu.pojo.Order;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.net.InetSocketAddress;
/**
* @Author shu
* @Version 1.0
* @Date: 2022/03/07/ 21:20
* @Description
**/
public class HttpJsonServer {
public void run(final int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
//接收HttpJsonRequest,需要对应解码器
//ByteBuf->FullHttpRequest-> HttpJsonRequestDecoder
//输出HttpJsonResponse,需要对应编码器
//HttpResponseEncoder->FullHttpResponse-> HttpJsonResponseEncoder
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast("http-decoder", new HttpRequestDecoder());
ch.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536));
ch.pipeline().addLast("json-decoder", new HttpJsonRequestDecoder(Order.class, true));
ch.pipeline().addLast("http-encoder", new HttpResponseEncoder());
ch.pipeline().addLast("json-encoder", new HttpJsonResponseEncoder());
ch.pipeline().addLast("jsonServerHandler", new HttpJsonServerHandler());
}
});
ChannelFuture future = b.bind(new InetSocketAddress(port)).sync();
System.out.println("HTTP订购服务器启动,网址是 : " + "http://localhost:"
+ port);
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
try {
port = Integer.parseInt(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
new HttpJsonServer().run(port);
}
}
package com.shu.server;
import com.shu.codec.HttpJsonRequest;
import com.shu.codec.HttpJsonResponse;
import com.shu.pojo.Address;
import com.shu.pojo.Order;
import com.shu.pojo.Shipping;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.ArrayList;
import java.util.List;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static io.netty.handler.codec.rtsp.RtspResponseStatuses.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.stomp.StompHeaders.CONTENT_TYPE;
/**
* @Author shu
* @Version 1.0
* @Date: 2022/03/07/ 21:20
* @Description
**/
public class HttpJsonServerHandler extends SimpleChannelInboundHandler<HttpJsonRequest> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpJsonRequest msg) throws Exception {
HttpRequest request = msg.getRequest();
Order order = (Order) msg.getBody();
System.out.println("Http server receive request : " + order);
// 返回客服端的消息
ChannelFuture future = ctx.writeAndFlush(new HttpJsonResponse(null, order));
if (!HttpUtil.isKeepAlive(request)) {
future.addListener(new GenericFutureListener<Future<? super Void>>() {
public void operationComplete(Future future) throws Exception {
ctx.close();
}
});
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
if (ctx.channel().isActive()) {
sendError(ctx, INTERNAL_SERVER_ERROR);
}
}
private static void sendError(ChannelHandlerContext ctx,
HttpResponseStatus status) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1,
status, Unpooled.copiedBuffer("失败: " + status.toString()
+ "\r\n", CharsetUtil.UTF_8));
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
}
- 客服端
package com.shu.client;
import com.shu.codec.HttpJsonRequestEncoder;
import com.shu.codec.HttpJsonResponseDecoder;
import com.shu.pojo.Order;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.net.InetSocketAddress;
/**
* @Author shu
* @Version 1.0
* @Date: 2022/03/07/ 21:22
* @Description Http客户端
**/
public class HttpJsonClient {
public void connect(int port) throws Exception {
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast("http-decoder",
new HttpResponseDecoder());
ch.pipeline().addLast("http-aggregator",
new HttpObjectAggregator(65536));
// json解码器
ch.pipeline().addLast("json-decoder", new HttpJsonResponseDecoder(Order.class, true));
ch.pipeline().addLast("http-encoder",
new HttpRequestEncoder());
ch.pipeline().addLast("json-encoder",
new HttpJsonRequestEncoder());
ch.pipeline().addLast("jsonClientHandler",
new HttpJsonClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(new InetSocketAddress(port)).sync();
// 当代客户端链路关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
new HttpJsonClient().connect(port);
}
}
package com.shu.client;
import com.shu.codec.HttpJsonRequest;
import com.shu.pojo.Address;
import com.shu.pojo.Customer;
import com.shu.pojo.Order;
import com.shu.pojo.Shipping;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* @Author shu
* @Version 1.0
* @Date: 2022/03/07/ 21:23
* @Description 自定义处理器
**/
public class HttpJsonClientHandler extends ChannelInboundHandlerAdapter {
/**
* 连接刚建立
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接上服务器...");
// 向服务器发送数据
HttpJsonRequest request = new HttpJsonRequest(null, new Order(2, new Customer(1,"ADMIN","123456"),new Address("四川","四川","四川","四川","四川"), Shipping.A,125f));
// 写入数据
ctx.writeAndFlush(request);
}
/**
* 读取服务器返回的数据
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg.getClass().getName());
System.out.println("接收到了数据..." + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
- 观察结果
首先客服端发送请求报文
服务端收到请求报文
服务端返回响应
客户端接受响应