spring cloud gateway是基于webflux的项目,因而不能跟使用spring mvc一样直接获取request body,因此需要重新构造再转发。
如果我们在spring cloud gateway 封装之前读取了一次request body,比如打印request body日志,在下游获取数据的时候会出现错误:[spring cloud] [error] java.lang.IllegalStateException: Only one connection receive subscriber allowed. 因为request body只能读取一次,它是属于消费类型的。
出现这样的原因是InputStream的read()方法内部有一个postion,标志当前流被读取到的位置,每读取一次,该标志就会移动一次,如果读到最后,read()会返回-1,表示已经读取完了。如果想要重新读取则需要调用reset()方法,position就会移动到上次调用mark的位置,mark默认是0,所以就能从头再读了。调用reset()方法的前提是已经重写了reset()方法,当然能否reset也是有条件的,它取决于markSupported()方法是否返回true,InputStream默认不实现reset(),并且markSupported()默认也是返回false。综上,InputStream默认不实现reset的相关方法,而ServletInputStream也没有重写reset的相关方法,这样就无法重复读取流,这就是我们从request对象中获取的输入流就只能读取一次的原因。
直接上代码
1.实体类
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
@Data
public class GatewayLog implements Serializable {
private static final long serialVersionUID = 1983879536575766072L;
/**访问实例*/
private String targetServer;
/**请求路径*/
private String requestPath;
/**请求方法*/
private String requestMethod;
/**协议 */
private String schema;
/**请求体*/
private String requestBody;
/**响应体*/
private String responseData;
/**请求ip*/
private String ip;
/**请求时间*/
private Date requestTime;
/**响应时间*/
private Date responseTime;
/**执行时间*/
private long executeTime;
/**返回码*/
private long code;
/**返回数据类型*/
private String responseContentType;
/**请求数据类型*/
private String requestContentType;
/**请求用户id*/
private String userId;
}
2.Service
public interface AccessLogService {
void saveAccessLog(GatewayLog gatewayLog);
}
3.AccessLogFilter
import cn.hutool.core.collection.CollectionUtil;
import com.shouwei.gateway.entity.GatewayLog;
import com.shouwei.gateway.service.AccessLogService;
import com.shouwei.gateway.utils.IpUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.factory.rewrite.CachedBodyOutputMessage;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.cloud.gateway.support.BodyInserterContext;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.core.Ordered;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.*;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.multipart.FormFieldPart;
import org.springframework.http.codec.multipart.Part;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.HandlerStrategies;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* /**
* 全局拦截器,作用所有的微服务
* 1. 对请求的API调用过滤,记录接口的请求时间,方便日志审计、告警、分析等运维操作
* 2. 后期可以扩展对接其他日志系统
*/
@Slf4j
@Component
public class AccessLogFilter implements GlobalFilter, Ordered {
/**
* default HttpMessageReader.
*/
private static final List<HttpMessageReader<?>> MESSAGE_READERS = HandlerStrategies.withDefaults().messageReaders();
@Autowired
private AccessLogService accessLogService;
private final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();
/**
* 顺序必须是<-1,否则标准的NettyWriteResponseFilter将在您的过滤器得到一个被调用的机会之前发送响应
* 也就是说如果不小于 -1 ,将不会执行获取后端响应的逻辑
*
* @return
*/
@Override
public int getOrder() {
return -100;
}
@Override
@SuppressWarnings("unchecked")
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
// 请求路径
String requestPath = request.getPath().pathWithinApplication().value();
Route route = getGatewayRoute(exchange);
String ipAddress = IpUtils.getIpAddress(request);
GatewayLog gatewayLog = new GatewayLog();
gatewayLog.setSchema(request.getURI().getScheme());
gatewayLog.setRequestMethod(request.getMethodValue());
gatewayLog.setRequestPath(requestPath);
gatewayLog.setTargetServer(route.getId());
gatewayLog.setRequestTime(new Date());
gatewayLog.setIp(ipAddress);
MediaType mediaType = request.getHeaders().getContentType();
gatewayLog.setRequestContentType(mediaType.getType() + "/" + mediaType.getSubtype());
//json格式
if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
return writeBodyLog(exchange, chain, gatewayLog);
}
//form-data格式
else if (MediaType.MULTIPART_FORM_DATA.isCompatibleWith(mediaType) || MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {
return readFormData(exchange, chain, gatewayLog);
}
//其他格式
else {
return writeBasicLog(exchange, chain, gatewayLog);
}
}
private Mono<Void> writeBasicLog(ServerWebExchange exchange, GatewayFilterChain chain, GatewayLog accessLog) {
return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
DataBufferUtils.retain(dataBuffer);
final Flux<DataBuffer> cachedFlux = Flux.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
final ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
@Override
public MultiValueMap<String, String> getQueryParams() {
return UriComponentsBuilder.fromUri(exchange.getRequest().getURI()).build().getQueryParams();
}
};
StringBuilder builder = new StringBuilder();
MultiValueMap<String, String> queryParams = exchange.getRequest().getQueryParams();
if (CollectionUtil.isNotEmpty(queryParams)) {
for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) {
builder.append(entry.getKey()).append("=").append(StringUtils.join(entry.getValue(), ","));
}
}
accessLog.setRequestBody(builder.toString());
//获取响应体
ServerHttpResponseDecorator decoratedResponse = recordResponseLog(exchange, accessLog);
return chain.filter(exchange.mutate().request(mutatedRequest).response(decoratedResponse).build())
.then(Mono.fromRunnable(() -> {
// 打印日志
writeAccessLog(accessLog);
}));
});
}
/**
* 读取form-data数据
*
* @param exchange
* @param chain
* @param accessLog
* @return
*/
private Mono<Void> readFormData(ServerWebExchange exchange, GatewayFilterChain chain, GatewayLog accessLog) {
return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
DataBufferUtils.retain(dataBuffer);
final Flux<DataBuffer> cachedFlux = Flux.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
final ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
@Override
public MultiValueMap<String, String> getQueryParams() {
return UriComponentsBuilder.fromUri(exchange.getRequest().getURI()).build().getQueryParams();
}
};
final HttpHeaders headers = exchange.getRequest().getHeaders();
if (headers.getContentLength() == 0) {
return chain.filter(exchange);
}
ResolvableType resolvableType;
if (MediaType.MULTIPART_FORM_DATA.isCompatibleWith(headers.getContentType())) {
resolvableType = ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, Part.class);
} else {
//解析 application/x-www-form-urlencoded
resolvableType = ResolvableType.forClass(String.class);
}
return MESSAGE_READERS.stream().filter(reader -> reader.canRead(resolvableType, mutatedRequest.getHeaders().getContentType())).findFirst()
.orElseThrow(() -> new IllegalStateException("no suitable HttpMessageReader.")).readMono(resolvableType, mutatedRequest, Collections.emptyMap()).flatMap(resolvedBody -> {
if (resolvedBody instanceof MultiValueMap) {
LinkedMultiValueMap map = (LinkedMultiValueMap) resolvedBody;
if (CollectionUtil.isNotEmpty(map)) {
StringBuilder builder = new StringBuilder();
final Part bodyPartInfo = (Part) ((MultiValueMap) resolvedBody).getFirst("body");
if (bodyPartInfo instanceof FormFieldPart) {
String body = ((FormFieldPart) bodyPartInfo).value();
// log.info("body ==== " + body);
builder.append("body=").append(body);
}
final Part uidPartInfo = (Part) ((MultiValueMap) resolvedBody).getFirst("uid");
if (uidPartInfo instanceof FormFieldPart) {
String uid = ((FormFieldPart) uidPartInfo).value();
// log.info("uid ==== " + uid);
accessLog.setUserId(uid);
if (builder.length() > 0) {
builder.append("&uid=").append(uid);
} else {
builder.append("uid=").append(uid);
}
}
final Part timeStampPartInfo = (Part) ((MultiValueMap) resolvedBody).getFirst("timeStamp");
if (timeStampPartInfo instanceof FormFieldPart) {
String timeStamp = ((FormFieldPart) timeStampPartInfo).value();
// log.info("timeStamp ==== " + timeStamp );
if (builder.length() > 0) {
builder.append("&timeStamp=").append(timeStamp);
} else {
builder.append("timeStamp=").append(timeStamp);
}
}
final Part tokenPartInfo = (Part) ((MultiValueMap) resolvedBody).getFirst("token");
if (tokenPartInfo instanceof FormFieldPart) {
String token = ((FormFieldPart) tokenPartInfo).value();
// log.info("token ==== " + token);
if (builder.length() > 0) {
builder.append("&token=").append(token);
} else {
builder.append("token=").append(token);
}
}
final Part signPartInfo = (Part) ((MultiValueMap) resolvedBody).getFirst("sign");
if (signPartInfo instanceof FormFieldPart) {
String sign = ((FormFieldPart) signPartInfo).value();
// log.info("sign ==== " + sign);
if (builder.length() > 0) {
builder.append("&sign=").append(sign);
} else {
builder.append("sign=").append(sign);
}
}
accessLog.setRequestBody(builder.toString());
}
} else {
accessLog.setRequestBody((String) resolvedBody);
}
//获取响应体
ServerHttpResponseDecorator decoratedResponse = recordResponseLog(exchange, accessLog);
return chain.filter(exchange.mutate().request(mutatedRequest).response(decoratedResponse).build())
.then(Mono.fromRunnable(() -> {
// 打印日志
writeAccessLog(accessLog);
}));
});
});
}
/**
* 解决 request body 只能读取一次问题,
* 参考: org.springframework.cloud.gateway.filter.factory.rewrite.ModifyRequestBodyGatewayFilterFactory
*
* @param exchange
* @param chain
* @param gatewayLog
* @return
*/
@SuppressWarnings("unchecked")
private Mono writeBodyLog(ServerWebExchange exchange, GatewayFilterChain chain, GatewayLog gatewayLog) {
ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);
Mono<String> modifiedBody = serverRequest.bodyToMono(String.class)
.flatMap(body -> {
gatewayLog.setRequestBody(body);
return Mono.just(body);
});
// 通过 BodyInserter 插入 body(支持修改body), 避免 request body 只能获取一次
BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);
HttpHeaders headers = new HttpHeaders();
headers.putAll(exchange.getRequest().getHeaders());
// the new content type will be computed by bodyInserter
// and then set in the request decorator
headers.remove(HttpHeaders.CONTENT_LENGTH);
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
return bodyInserter.insert(outputMessage, new BodyInserterContext())
.then(Mono.defer(() -> {
// 重新封装请求
ServerHttpRequest decoratedRequest = requestDecorate(exchange, headers, outputMessage);
// 记录响应日志
ServerHttpResponseDecorator decoratedResponse = recordResponseLog(exchange, gatewayLog);
// 记录普通的
return chain.filter(exchange.mutate().request(decoratedRequest).response(decoratedResponse).build())
.then(Mono.fromRunnable(() -> {
// 打印日志
writeAccessLog(gatewayLog);
}));
}));
}
/**
* 打印日志
*
* @param gatewayLog 网关日志
* @author javadaily
* @date 2021/3/24 14:53
*/
private void writeAccessLog(GatewayLog gatewayLog) {
accessLogService.saveAccessLog(gatewayLog);
}
private Route getGatewayRoute(ServerWebExchange exchange) {
return exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
}
/**
* 请求装饰器,重新计算 headers
*
* @param exchange
* @param headers
* @param outputMessage
* @return
*/
private ServerHttpRequestDecorator requestDecorate(ServerWebExchange exchange, HttpHeaders headers,
CachedBodyOutputMessage outputMessage) {
return new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public HttpHeaders getHeaders() {
long contentLength = headers.getContentLength();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(super.getHeaders());
if (contentLength > 0) {
httpHeaders.setContentLength(contentLength);
} else {
// TODO: this causes a 'HTTP/1.1 411 Length Required' // on
// httpbin.org
httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
}
return httpHeaders;
}
@Override
public Flux<DataBuffer> getBody() {
return outputMessage.getBody();
}
};
}
/**
* 记录响应日志
* 通过 DataBufferFactory 解决响应体分段传输问题。
*/
private ServerHttpResponseDecorator recordResponseLog(ServerWebExchange exchange, GatewayLog gatewayLog) {
ServerHttpResponse response = exchange.getResponse();
DataBufferFactory bufferFactory = response.bufferFactory();
return new ServerHttpResponseDecorator(response) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (body instanceof Flux) {
Date responseTime = new Date();
gatewayLog.setResponseTime(responseTime);
// 计算执行时间
long executeTime = (responseTime.getTime() - gatewayLog.getRequestTime().getTime());
gatewayLog.setExecuteTime(executeTime);
// 获取响应类型,如果是 json 就打印
String originalResponseContentType = exchange.getAttribute(ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
// log.info("originalResponseContentType =========== " + originalResponseContentType);
gatewayLog.setResponseContentType(originalResponseContentType);
gatewayLog.setCode(this.getStatusCode().value());
// if (ObjectUtils.equals(this.getStatusCode(), HttpStatus.OK)
// && !StringUtil.isNullOrEmpty(originalResponseContentType)
// && originalResponseContentType.contains("application/json")) {
Flux<? extends DataBuffer> fluxBody = Flux.from(body);
return super.writeWith(fluxBody.buffer().map(dataBuffers -> {
// 合并多个流集合,解决返回体分段传输
DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
DataBuffer join = dataBufferFactory.join(dataBuffers);
byte[] content = new byte[join.readableByteCount()];
join.read(content);
// 释放掉内存
DataBufferUtils.release(join);
String responseResult = new String(content, StandardCharsets.UTF_8);
// log.info("responseResult =========== " + responseResult);
gatewayLog.setResponseData(responseResult);
return bufferFactory.wrap(content);
}));
}
// }
// if body is not a flux. never got there.
return super.writeWith(body);
}
};
}
}
4.引入的依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
<!--huTool工具箱大全-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.2.5</version>
</dependency>