1. 需求背景
Gateway 专注app软件定制开发作为微服务集群的入口,专注app软件定制开发除了进行一些权限验证、header封装以外,专注app软件定制开发可能也需要对请求体body进行封装。
专注app软件定制开发比如随着业务子系统的扩展,专注app软件定制开发各子系统的请求体body专注app软件定制开发格式各不一致,例如:子系统A专注app软件定制开发的请求体入参要求是Json专注app软件定制开发格式体既可:{"name":"aaa"},专注app软件定制开发但是子系统B请求体入参要求是Json格式体,但是因为历史原因,虽然也是要求Json格式,但是在最外层进行一层封装,格式为:{body: 实际的json},这个封装的操作就可以在Gateway 的过滤器中进行封装。
再比如:我们在里面需要读取到原始请求体的入参,其中包括json格式和文件上传类型的入参,获取到这里入参后需要进行一些签名处理后,保存在header中。这种情况下,我们就需要针对不同的请求类型的请求体进行缓存。而不能全部当作json字符串请求体进行缓存。
2. 具体方法
2.1 request body 只能读取一次问题
在Gateway中通常会有一个过滤器链,而 request body 只能读取一次,也就是说,如果在过滤器A中已经读取一次,在后面的过滤器B是无法读取成功的,会抛出如下的报错:
- java.lang.IllegalStateException: Only one connection receive subscriber allowed.
- at reactor.ipc.netty.channel.FluxReceive.startReceiver(FluxReceive.java:279)
- at reactor.ipc.netty.channel.FluxReceive.lambda$subscribe$2(FluxReceive.java:129)
- at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:163)
- at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
- at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
- at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:446)
- at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
- at java.lang.Thread.run(Thread.java:745)
大意就是netty的request body只能读取一次,第二次读取就报这个错误了。
问题原因
翻查GitHub终于找到,spring boot在2.0.5版本如果使用了WebFlux就自动配置HiddenHttpMethodFilter
过滤器。
查看源码发现,这个过滤器的作用是,针对当前的浏览器一般只支持GET
和POST
表单提交方法,如果想使用其他HTTP方法(如:PUT、DELETE、PATCH
),就只能通过一个隐藏的属性如(_method=PUT
)来表示,那么HiddenHttpMethodFilter
的作用是将POST
请求的_method
参数里面的value替换掉http请求的方法。
想法是很好的,用一种折中的方法来支持使浏览器支持restful方法。
如果只是使用spring boot,一切都是没有问题的,因为使用的过程中,不需要我们自己解析request body,到controller这一层,这一切就已经完成的了。
但是spring cloud gateway需要,因为它的做法就是拿到原始请求信息(包括request body),再重新封装一个request路由到下游,所以上面的问题就在于:
-
HiddenHttpMethodFilter
读取了一次request body; -
gateway的封装自己的request时,去读取request body,就报错了。
所以这个是spring cloud gateway和spring boot开发者没协商好,都去读取request body的问题。
问题解决方案
HiddenHttpMethodFilter
是spring boot在2.0.5版本自动引入的,将版本降到2.0.4即可。- 在不降版本的前提下,增加一个缓存请求体过滤器 CacheBodyGlobalFilter ,将其执行优先级设置最大(order值最小),使其在过滤器链中最先执行。
2.2 缓存请求体过滤器
实际工作中,post请求通常是分为两种,一种是json请求类型(ContentType=application/json),还有一种是上传文件类型的form表单(ContentType=multipart/form-data),可以根据请求类型的不同,分别缓存请求体body,所以这里先新建一个GatewayContext类对数据进行缓存
GatewayContext.java
- package com.test.filter;
-
- import lombok.Data;
- import org.springframework.http.codec.multipart.Part;
- import org.springframework.util.MultiValueMap;
-
- @Data
- public class GatewayContext {
-
- public static final String CACHE_GATEWAY_CONTEXT = "cacheGatewayContext";
-
- /**
- * cache json body
- */
- private String jsonBody;
-
- /**
- *--multipart/form表单参数
- */
- private MultiValueMap<String, Part> multiPartParams;
- }
全局过滤器 CacheBodyGlobalFilter.java
- package com.test.filter;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.cloud.gateway.filter.GatewayFilterChain;
- import org.springframework.cloud.gateway.filter.GlobalFilter;
- import org.springframework.core.Ordered;
- import org.springframework.core.ParameterizedTypeReference;
- import org.springframework.core.io.buffer.*;
- import org.springframework.http.HttpMethod;
- import org.springframework.http.MediaType;
- import org.springframework.http.codec.HttpMessageReader;
- import org.springframework.http.codec.ServerCodecConfigurer;
- import org.springframework.http.codec.multipart.Part;
- import org.springframework.http.server.reactive.ServerHttpRequest;
- import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
- import org.springframework.stereotype.Component;
- import org.springframework.util.MultiValueMap;
- import org.springframework.web.reactive.function.server.ServerRequest;
- import org.springframework.web.server.ServerWebExchange;
- import reactor.core.publisher.Flux;
- import reactor.core.publisher.Mono;
-
- import java.util.List;
- import java.util.Objects;
-
-
- @Slf4j
- @Component
- public class CacheBodyGlobalFilter implements Ordered, GlobalFilter {
-
- private List<HttpMessageReader<?>> messageReaders;
- private ParameterizedTypeReference<MultiValueMap<String, Part>> MULTI_PART = new ParameterizedTypeReference<MultiValueMap<String, Part>>(){};
-
- public CacheBodyGlobalFilter(ServerCodecConfigurer configurer) {
- this.messageReaders = configurer.getReaders();
- }
-
- @Override
- public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
- GatewayContext gatewayContext = new GatewayContext();
- exchange.getAttributes().put(GatewayContext.CACHE_GATEWAY_CONTEXT, gatewayContext);
- ServerHttpRequest request = exchange.getRequest();
- MediaType contentType = request.getHeaders().getContentType();
- // 目前只缓存 json 和 multipart 表单两种请求类型
- if (Objects.nonNull(contentType) && Objects.nonNull(request.getMethod()) && request.getMethod().equals(HttpMethod.POST)) {
- if (MediaType.MULTIPART_FORM_DATA.isCompatibleWith(contentType)) {
- return readMultiPartFormData(exchange, chain, gatewayContext);
- } else if (MediaType.APPLICATION_JSON.isCompatibleWith(contentType)) {
- return readBody(exchange, chain, gatewayContext);
- }
- }
- return chain.filter(exchange);
- }
-
- @Override
- public int getOrder() {
- return Ordered.HIGHEST_PRECEDENCE;
- }
-
- private Mono<Void> readMultiPartFormData(ServerWebExchange exchange, GatewayFilterChain chain, GatewayContext gatewayContext) {
- // 当body为空时,只会执行这一个拦截器, 原因是fileMap中的代码没有执行,所以需要在body为空时构建一个空的缓存
- DefaultDataBufferFactory defaultDataBufferFactory = new DefaultDataBufferFactory();
- DefaultDataBuffer defaultDataBuffer = defaultDataBufferFactory.allocateBuffer(0);
- Mono<DataBuffer> mono = Flux.from(exchange.getRequest().getBody().defaultIfEmpty(defaultDataBuffer))
- .collectList().filter(list -> {
- log.info("请求体缓存过滤器:body为空");
- return true;
- }).map(list -> list.get(0).factory().join(list)).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
- return mono.flatMap(dataBuffer -> {
- byte[] bytes = new byte[dataBuffer.readableByteCount()];
- dataBuffer.read(bytes);
- DataBufferUtils.release(dataBuffer);
- ServerHttpRequestDecorator mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
- @Override
- public Flux<DataBuffer> getBody() {
- return Flux.defer(() -> {
- DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
- DataBufferUtils.retain(buffer);
- return Mono.just(buffer);
- });
- }
- };
- ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();
- return ServerRequest.create(mutatedExchange, messageReaders).bodyToMono(MULTI_PART)
- .doOnNext(multiPartMap -> {
- gatewayContext.setMultiPartParams(multiPartMap);
- }).then(chain.filter(mutatedExchange));
- });
- }
-
- private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain, GatewayContext gatewayContext) {
- // 当body为空(请求体中"{}"都不存在)时,只会执行这一个拦截器, 原因是fileMap中的代码没有执行,所以需要在body为空时构建一个空的缓存
- DefaultDataBufferFactory defaultDataBufferFactory = new DefaultDataBufferFactory();
- DefaultDataBuffer defaultDataBuffer = defaultDataBufferFactory.allocateBuffer(0);
- Mono<DataBuffer> mono = Flux.from(exchange.getRequest().getBody().defaultIfEmpty(defaultDataBuffer))
- .collectList().filter(list -> {
- log.info("请求体缓存过滤器:body为空");
- return true;
- }).map(list -> list.get(0).factory().join(list)).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
- return mono.flatMap(dataBuffer -> {
- byte[] bytes = new byte[dataBuffer.readableByteCount()];
- dataBuffer.read(bytes);
- DataBufferUtils.release(dataBuffer);
- ServerHttpRequestDecorator mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
- @Override
- public Flux<DataBuffer> getBody() {
- return Flux.defer(() -> {
- DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
- DataBufferUtils.retain(buffer);
- return Mono.just(buffer);
- });
- }
- };
- ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();
- return ServerRequest.create(mutatedExchange, messageReaders)
- .bodyToMono(String.class)
- .doOnNext(objectValue -> {
- gatewayContext.setJsonBody(objectValue);
- }).then(chain.filter(mutatedExchange));
- });
- }
- }
CacheBodyGlobalFilter这个全局过滤器的目的就是把原有的request请求中的body内容读出来,并且使用ServerHttpRequestDecorator这个请求装饰器对request进行包装,重写getBody方法,并把包装后的请求放到过滤器链中传递下去。这样后面的过滤器中再使用exchange.getRequest().getBody()来获取body时,实际上就是调用的重载后的getBody方法,获取的最先已经缓存了的body数据。这样就能够实现body的多次读取了。
这个过滤器的order设置的是Ordered.HIGHEST_PRECEDENCE,即最高优先级的过滤器。优先级设置这么高的原因是某些系统内置的过滤器可能也会去读body。
说一下代码中对于body请求体为空的处理。
- // 当body为空时,只会执行这一个拦截器, 原因是fileMap中的代码没有执行,所以需要在body为空时构建一个空的缓存
- DefaultDataBufferFactory defaultDataBufferFactory = new DefaultDataBufferFactory();
- DefaultDataBuffer defaultDataBuffer = defaultDataBufferFactory.allocateBuffer(0);
测试中我发现,如果我在请求接口中如果没有body内容,就会导致程序只能执行CacheBodyGlobalFilter这一个拦截器,而无法执行其他拦截器(自定义的和默认的)。而且接口返回200,这和我的预期时不一致的。
通过测试发现,原因是,按照以上代码执行,如果是body为null,Conten-Type也为空,所以没有执行代码中的flatMap()方法.所以也就没有执行后面的调用链。
解决办法,在获取到数据流时,如果数据流为null,我们可以构建一个空的数据流,这也也就能执行我们后面的拦截器。利用Flux.defaultIfEmpty(defaultDataBuffer);的方法可以实现这个功能。
2.3 后续过滤器中读取缓存的body
以读取请求体中的入参进行签名为例:对于json请求类型,直接对所有入参取出放进签名工具类进行签名操作;而对于文件上传类型的表单multipart/form-data,将除了文件类型file的入参以外,其他的所有入参取出来进行签名。
从GatewayContext的缓存中读取请求体,使用GatewayContext gatewayContext = exchange.getAttribute(GatewayContext.CACHE_GATEWAY_CONTEXT);既可,因为在全局过滤器中已经使用 exchange.getAttributes().put(GatewayContext.CACHE_GATEWAY_CONTEXT, gatewayContext); 保存过。
- // 2.2 获取请求入参
- Map<String, String> params = new HashMap<>(exchange.getRequest().getQueryParams().entrySet().size());
- exchange.getRequest().getQueryParams().forEach((key, valueList) -> {
- params.put(key, valueList.stream().findFirst().get());
- });
- log.info("签名处理 - 读取到的请求入参为:{}", params);
- // 2.3 对post请求体中的入参进行签名
- GatewayContext gatewayContext = exchange.getAttribute(GatewayContext.CACHE_GATEWAY_CONTEXT);
- String jsonBody = null;
- MediaType contentType = exchange.getRequest().getHeaders().getContentType();
- if (Objects.nonNull(contentType) && Objects.nonNull(exchange.getRequest().getMethod())
- && exchange.getRequest().getMethod().equals(HttpMethod.POST)) {
- if (MediaType.MULTIPART_FORM_DATA.isCompatibleWith(contentType)) {
- // 文件表单类型读取除file外字段进行签名
- Map<String, String> paramsMap = this.readFormSignBody(gatewayContext.getMultiPartParams());
- params.putAll(paramsMap);
- } else if (MediaType.APPLICATION_JSON.isCompatibleWith(contentType)) {
- jsonBody = gatewayContext.getJsonBody();
- }
- }
具体的从form表单中读取入参的方法如下:
- private Map<String, String> readFormSignBody(MultiValueMap<String, Part> multiPartParams) {
- Map<String, String> params = Maps.newHashMap();
- if (Objects.nonNull(multiPartParams) && !multiPartParams.isEmpty()) {
- for(Map.Entry<String, List<Part>> entry : multiPartParams.entrySet()) {
- String key = entry.getKey();
- List<Part> value = entry.getValue();
- if (StringUtils.isBlank(key) || CollectionUtils.isEmpty(value)) {
- continue;
- }
- for (Part part : entry.getValue()) {
- // 文件不参与签名
- if (part instanceof FilePart) {
- continue;
- }
- if (!(part instanceof FormFieldPart)) {
- log.error("multipart/formdata Part 类型即不是file也不是formfield,class - {}!", part.getClass().getCanonicalName());
- continue;
- }
- AtomicReference<String> valueHolder = new AtomicReference<String>();
- part.content().subscribe(buffer -> {
- byte[] datas = new byte[buffer.readableByteCount()];
- buffer.read(datas);
- DataBufferUtils.release(buffer);
- if (ArrayUtil.isNotEmpty(datas)) {
- String paramValue = new String(datas);
- if (StringUtils.isNotEmpty(paramValue)) {
- valueHolder.set(paramValue);
- }
- }
- });
- params.put(key, valueHolder.get());
- }
- }
- }
- return params;
- }
将读取到的json请求体或者form表单请求入参,使用签名工具进行签名处理。
- log.info("签名处理 - 读取到的 body 入参为: {}", jsonBody);
- String sign = ThirdUserCenterSignUtil.sign(saleAssistSignSecret, headersMap, params, jsonBody);
- log.info("最终生成的签名为: {}", sign);
- // 获取签名后,将签名值保存到请求头header中
- exchange.getRequest().mutate().header(ThirdHeaderSignEnum.X_VALIDATE_SIGN.getCode(), sign);
2.4 过滤器2中对请求体进行修改
以对json请求类型的请求体进行封装为例,如:原json请求体为:{"name" : "xxx"},现在需要封装成为{"body" : 原json},即:{"body" : "{"name" : "xxx"}"} 格式,可以使用以下方式进行封装:
- import org.apache.commons.collections4.CollectionUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.cloud.context.config.annotation.RefreshScope;
- import org.springframework.cloud.gateway.filter.GatewayFilterChain;
- import org.springframework.cloud.gateway.filter.factory.rewrite.CachedBodyOutputMessage;
- import org.springframework.cloud.gateway.support.BodyInserterContext;
- import org.springframework.core.io.buffer.DataBuffer;
- import org.springframework.core.io.buffer.DataBufferUtils;
- import org.springframework.http.HttpHeaders;
- import org.springframework.http.HttpMethod;
- import org.springframework.http.HttpStatus;
- import org.springframework.http.MediaType;
- import org.springframework.http.codec.multipart.FilePart;
- import org.springframework.http.codec.multipart.FormFieldPart;
- import org.springframework.http.codec.multipart.Part;
- import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
- import org.springframework.stereotype.Component;
- import org.springframework.util.MultiValueMap;
- import org.springframework.web.reactive.function.BodyInserter;
- import org.springframework.web.reactive.function.BodyInserters;
- import org.springframework.web.reactive.function.server.HandlerStrategies;
- import org.springframework.web.reactive.function.server.ServerRequest;
- import org.springframework.web.server.ServerWebExchange;
- import org.springframework.core.Ordered;
- import reactor.core.publisher.Flux;
- import reactor.core.publisher.Mono;
-
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Objects;
- import java.util.concurrent.atomic.AtomicReference;
-
- public class ReadReqBodyFilter2 implements GlobalFilter, Ordered {
-
- public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) implements GlobalFilter, Ordered {
- log.info("第三方请求过滤器处理 start");
- Mono<Void> mono = chain.filter(exchange);
- if (Objects.nonNull(contentType) && Objects.nonNull(exchange.getRequest().getMethod())
- && exchange.getRequest().getMethod().equals(HttpMethod.POST)) {
- if (MediaType.APPLICATION_JSON.isCompatibleWith(contentType)) {
- // json 请求体处理
- mono = this.transferBody(exchange, chain);
- }
- }
- log.info("第三方请求过滤器处理 end");
- return mono;
- }
-
- @Override
- public int getOrder() {
- return Ordered.HIGHEST_PRECEDENCE + 100;
- }
-
- /**
- * 修改原请求体内容
- */
- private Mono<Void> transferBody(ServerWebExchange exchange, GatewayFilterChain chain) {
- log.info("第三方请求过滤器处理 --- 请求体处理 ---- start");
- ServerRequest serverRequest = ServerRequest.create(exchange, HandlerStrategies.withDefaults().messageReaders());
- Mono modifiedBody = serverRequest.bodyToMono(String.class).flatMap(oldBody -> {
- // 对原始请求body进行封装,格式:{ "body": 原始 json 体}
- // 当然这里也可以将修改后的请求体覆盖到GatewayContext缓存中,这里没有覆盖是因为想要保留最原始的请求体内容
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("body", oldBody);
- String newBody = jsonObject.toJSONString();
- return Mono.just(newBody);
- });
- BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);
- HttpHeaders headers = new HttpHeaders();
- headers.putAll(exchange.getRequest().getHeaders());
- headers.remove(HttpHeaders.CONTENT_LENGTH);
- CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
- Mono mono = bodyInserter.insert(outputMessage, new BodyInserterContext())
- .then(Mono.defer(() -> {
- ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(
- exchange.getRequest()) {
- @Override
- public HttpHeaders getHeaders() {
- long contentLength = headers.getContentLength();
- HttpHeaders httpHeaders = new HttpHeaders();
- httpHeaders.putAll(super.getHeaders());
- if (contentLength > 0) {
- httpHeaders.setContentLength(contentLength);
- } else {
- httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
- }
- return httpHeaders;
- }
-
- @Override
- public Flux<DataBuffer> getBody() {
- return outputMessage.getBody();
- }
- };
- return chain.filter(exchange.mutate().request(decorator).build());
- }));
- log.info("第三方请求过滤器处理 --- 请求体处理 ---- end");
- return mono;
- }
- }
2.5 过滤器3中最后的过滤器从缓存读取修改后请求体
- package com.test.filter;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.cloud.gateway.filter.GatewayFilterChain;
- import org.springframework.cloud.gateway.filter.GlobalFilter;
- import org.springframework.core.Ordered;
- import org.springframework.core.io.buffer.DataBuffer;
- import org.springframework.core.io.buffer.DataBufferUtils;
- import org.springframework.stereotype.Component;
- import org.springframework.web.server.ServerWebExchange;
- import reactor.core.publisher.Flux;
- import reactor.core.publisher.Mono;
-
- import java.nio.CharBuffer;
- import java.nio.charset.StandardCharsets;
- import java.util.concurrent.atomic.AtomicReference;
-
- @Component
- @Slf4j
- public class ReadReqBodyFilter3 implements GlobalFilter, Ordered {
-
- /**
- * 从缓存中读取请求体
- */
- public String resolveBodyFromRequest(Flux<DataBuffer> body) {
- AtomicReference<String> bodyRef = new AtomicReference<>();
- // 缓存读取的request body信息
- body.subscribe(dataBuffer -> {
- CharBuffer charBuffer = StandardCharsets.UTF_8.decode(dataBuffer.asByteBuffer());
- DataBufferUtils.release(dataBuffer);
- bodyRef.set(charBuffer.toString());
- });
- return bodyRef.get();
- }
-
- @Override
- public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
- log.info("过滤器3从缓存中读取修改后请求体body start");
- String signBody = this.resolveBodyFromRequest(exchange.getRequest().getBody());
- log.info("过滤器3从缓存中读取修改后请求体body end", signBody);
- return chain.filter(exchange);
- }
-
- @Override
- public int getOrder() {
- return 100;
- }
- }
从过滤器3的日志就可以看出:原始的请求体已经被过滤器1修改了: {body: 原始请求体json}。