WebFlux 004、WebFilter踩坑记录

一、背景使用SpringWebFlux的WebFilter时 , 由于不熟悉或一些思考疏忽 , 容易出现未知的异常 。记录一下排查与解决方案,给大家分享一下 。
二、问题2.1 问题描述在测试接口方法时 , 出现的错误信息如下(对一些项目路径做了修改):
java.lang.IllegalStateException: COMPLETED at org.springframework.http.server.reactive.AbstractListenerReadPublisher$State.subscribe(AbstractListenerReadPublisher.java:451) Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:Error has been observed at the following site(s): *__checkpoint ? springfox.boot.starter.autoconfigure.SwaggerUiWebFluxConfiguration$CustomWebFilter [DefaultWebFilterChain] *__checkpoint ? com.xxx.config.LoginWebFilter$$EnhancerBySpringCGLIB$$f3da6bdf [DefaultWebFilterChain] *__checkpoint ? com.xxx.config.TraceIdFilter [DefaultWebFilterChain] *__checkpoint ? HTTP POST "/abc/test/testMethod" [ExceptionHandlingWebHandler]Original Stack Trace:at org.springframework.http.server.reactive.AbstractListenerReadPublisher$State.subscribe(AbstractListenerReadPublisher.java:451)at org.springframework.http.server.reactive.AbstractListenerReadPublisher.subscribe(AbstractListenerReadPublisher.java:105)2.2 解决问题通过查看错误信息描述,checkpoint点都在webfilter中,由于对webflux也不是特别熟,所以就只有一个个测试 。
通过一系列操作 ,  把swagger移除 , 细读TraceIdFilter(内容不多),主要归功于原方案是正确的,修改后错误,最后才定位问题出现在LoginWebFilter 。
说说插曲,原实现方式(有阻塞逻辑,没出现上述异常),代码如下:
@Configuration@Slf4j@Order(-10)public class LoginWebFilter implements WebFilter {// 略...@Overridepublic Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {ServerHttpRequest request = exchange.getRequest();if (!enableGateway) {String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN)).orElse("");// 获取用户信息User user = getUser(token);if (user != null) {ServerHttpRequest mutateRequest = exchange.getRequest().mutate().build();exchange = exchange.mutate().request(mutateRequest).build();}}return chain.filter(exchange);}private User getUser(String token) {if (StringUtils.isNotBlank(token)) {return redisTemplate.opsForValue().get("xxx:tk:" + token).flatMap(str -> Mono.justOrEmpty(JsonUtils.toObj(str, User.class))).block();}return null;}}这样写,没有复杂的业务逻辑 , 从上到下,完全OJBK,但是调整后 , 就出现了上述异常 。
改完后的问题代码如下:
// 错误public class LoginWebFilter implements WebFilter { /...略@Autowiredprivate ReactiveStringRedisTemplate redisTemplate;@Overridepublic Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {if (!enableGateway) {ServerHttpRequest request = exchange.getRequest();String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN)).orElse("");return getUser(token).flatMap(user -> {ServerHttpRequest mutateRequest = exchange.getRequest().mutate().header(UserUtils.MEMBER_ID, user.getMemId()).header(UserUtils.MOBILE, user.getMobile()).build();ServerWebExchange newexchange = exchange.mutate().request(mutateRequest).build();return chain.filter(newexchange);// 问题点}).switchIfEmpty(chain.filter(exchange));}return chain.filter(exchange);} // 不在用blockprivate Mono<User> getUser(String token) {if (StringUtils.isNotBlank(token)) {return redisTemplate.opsForValue().get("xxx:tk:" + token).flatMap(str -> Mono.justOrEmpty(JsonUtils.toObj(str, User.class)));}return Mono.empty();}}2.3 如何解决对比改造前和改造后的代码,其实差异不大,那问题出现在哪呢?
由于对webflux也不是特别熟 , 那就只能一点点试(太蠢了) 。最后发现问题出现在了switchIfEmpty(chain.filter(exchange)),在去掉了switchIfEmpty(chain.filter(exchange)),就不会在出现上述异常 。
修改后部分代码如下:
// 半正确@Overridepublic Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {if (!enableGateway) {ServerHttpRequest request = exchange.getRequest();String token = Optional.ofNullable(request.getHeaders().getFirst(Constants.TOKEN)).orElse(“”);return getUser(token).flatMap(user -> {ServerHttpRequest mutateRequest = exchange.getRequest().mutate().header(UserUtils.MEMBER_ID, user.getMemId()).header(UserUtils.MOBILE, user.getMobile()).build();ServerWebExchange newexchange = exchange.mutate().request(mutateRequest).build();return chain.filter(newexchange);});}return chain.filter(exchange);}虽然现在不回在出现异常,但是去掉switchIfEmpty后,代码逻辑是不完整的,当获取不到User时,返回Mono.emtpy,那会直接结束流程,不在执行剩下的filter或其他逻辑 。真是连环坑,一坑接一坑 。所以对代码需要调整一番,调整后如下:

推荐阅读