spring cloud gateway版本3.1.0
客户管理系统开发定制客户端通过spring cloud gateway转发到webSocket服务器,建立webSocket连接,客户管理系统开发定制当客户端断开连接后。网关会报如下错误信息:
Caused by: java.lang.IllegalArgumentException: WebSocket close status code does NOT comply with RFC-6455: 1005
这个报错实测,不会造成什么功能影响。但是报错信息的产生,可能会导致系统误判。
跟踪
由于是网关报错,所以在spring cloud gateway方面进行排查。
出现问题的代码 WebsocketRoutingFilter
首先发现一个关闭的issues,发现已经解决了该报错提示
该提交记录的代码显示已经解决了报错信息。
但是在正式版本中,实际上这个问题还未解决。
因为这个原因
I think serverClose and proxyClose don’t need to be included in, and probably should be separated from the zip because they are sort of competing with the Mono from each WebSocketHandler, and zip will cancel the other publishers after one of them completes.
持续跟踪…
问题的来源
问题在于org.springframework.cloud.gateway.filter.WebsocketRoutingFilter使用的方式ReactorNettyWebSocketSession
解决方案
自定义过滤器,在WebsocketRoutingFilter的基础上进行修改。
方法一
参考
package com.qf.gateway.filter;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.springframework.beans.factory.ObjectProvider;import org.springframework.cloud.gateway.filter.GatewayFilterChain;import org.springframework.cloud.gateway.filter.GlobalFilter;import org.springframework.cloud.gateway.filter.headers.HttpHeadersFilter;import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;import org.springframework.core.Ordered;import org.springframework.http.HttpHeaders;import org.springframework.stereotype.Component;import org.springframework.util.StringUtils;import org.springframework.web.reactive.socket.WebSocketHandler;import org.springframework.web.reactive.socket.WebSocketMessage;import org.springframework.web.reactive.socket.WebSocketSession;import org.springframework.web.reactive.socket.client.WebSocketClient;import org.springframework.web.reactive.socket.server.WebSocketService;import org.springframework.web.server.ServerWebExchange;import org.springframework.web.util.UriComponentsBuilder;import reactor.core.publisher.Mono;import java.net.URI;import java.util.*;/** * 解决websocket关闭异常 问题 * @author admin * @Desc websocket客户端主动断开连接,网关服务报错1005 * @date 2022/8/24 14:30 */@Componentpublic class CustomWebsocketRoutingFilter implements GlobalFilter, Ordered { public static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol"; private static final Log log = LogFactory.getLog(CustomWebsocketRoutingFilter.class); private final WebSocketClient webSocketClient; private final WebSocketService webSocketService; private final ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider; private volatile List<HttpHeadersFilter> headersFilters; public CustomWebsocketRoutingFilter(WebSocketClient webSocketClient, WebSocketService webSocketService, ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider) { this.webSocketClient = webSocketClient; this.webSocketService = webSocketService; this.headersFiltersProvider = headersFiltersProvider; } static String convertHttpToWs(String scheme) { scheme = scheme.toLowerCase(); return "http".equals(scheme) ? "ws" : ("https".equals(scheme) ? "wss" : scheme); } @Override public int getOrder() { return 2147483645; } @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { changeSchemeIfIsWebSocketUpgrade(exchange); URI requestUrl = (URI)exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR); String scheme = requestUrl.getScheme(); if (!ServerWebExchangeUtils.isAlreadyRouted(exchange) && ("ws".equals(scheme) || "wss".equals(scheme))) { ServerWebExchangeUtils.setAlreadyRouted(exchange); HttpHeaders headers = exchange.getRequest().getHeaders(); HttpHeaders filtered = HttpHeadersFilter.filterRequest(this.getHeadersFilters(), exchange); List<String> protocols = this.getProtocols(headers); return this.webSocketService.handleRequest(exchange, new CustomWebsocketRoutingFilter.ProxyWebSocketHandler(requestUrl, this.webSocketClient, filtered, protocols)); } else { return chain.filter(exchange); } } List<String> getProtocols(HttpHeaders headers) { List<String> protocols = headers.get("Sec-WebSocket-Protocol"); if (protocols != null) { ArrayList<String> updatedProtocols = new ArrayList(); for(int i = 0; i < ((List)protocols).size(); ++i) { String protocol = (String)((List)protocols).get(i); updatedProtocols.addAll(Arrays.asList(StringUtils.tokenizeToStringArray(protocol, ","))); } protocols = updatedProtocols; } return (List)protocols; } List<HttpHeadersFilter> getHeadersFilters() { if (this.headersFilters == null) { this.headersFilters = (List)this.headersFiltersProvider.getIfAvailable(ArrayList::new); this.headersFilters.add((headers, exchange) -> { HttpHeaders filtered = new HttpHeaders(); filtered.addAll(headers); filtered.remove("Host"); boolean preserveHost = (Boolean)exchange.getAttributeOrDefault(ServerWebExchangeUtils.PRESERVE_HOST_HEADER_ATTRIBUTE, false); if (preserveHost) { String host = exchange.getRequest().getHeaders().getFirst("Host"); filtered.add("Host", host); } return filtered; }); this.headersFilters.add((headers, exchange) -> { HttpHeaders filtered = new HttpHeaders(); Iterator var3 = headers.entrySet().iterator(); while(var3.hasNext()) { Map.Entry<String, List<String>> entry = (Map.Entry)var3.next(); if (!((String)entry.getKey()).toLowerCase().startsWith("sec-websocket")) { filtered.addAll((String)entry.getKey(), (List)entry.getValue()); } } return filtered; }); } return this.headersFilters; } static void changeSchemeIfIsWebSocketUpgrade(ServerWebExchange exchange) { URI requestUrl = (URI)exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR); String scheme = requestUrl.getScheme().toLowerCase(); String upgrade = exchange.getRequest().getHeaders().getUpgrade(); if ("WebSocket".equalsIgnoreCase(upgrade) && ("http".equals(scheme) || "https".equals(scheme))) { String wsScheme = convertHttpToWs(scheme); boolean encoded = ServerWebExchangeUtils.containsEncodedParts(requestUrl); URI wsRequestUrl = UriComponentsBuilder.fromUri(requestUrl).scheme(wsScheme).build(encoded).toUri(); exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, wsRequestUrl); if (log.isTraceEnabled()) { log.trace("changeSchemeTo:[" + wsRequestUrl + "]"); } } } private static class ProxyWebSocketHandler implements WebSocketHandler { private final WebSocketClient client; private final URI url; private final HttpHeaders headers; private final List<String> subProtocols; ProxyWebSocketHandler(URI url, WebSocketClient client, HttpHeaders headers, List<String> protocols) { this.client = client; this.url = url; this.headers = headers; if (protocols != null) { this.subProtocols = protocols; } else { this.subProtocols = Collections.emptyList(); } } @Override public List<String> getSubProtocols() { return this.subProtocols; } @Override public Mono<Void> handle(WebSocketSession session) { return this.client.execute(this.url, this.headers, new WebSocketHandler() { @Override public Mono<Void> handle(WebSocketSession proxySession) { Mono<Void> serverClose = proxySession.closeStatus().filter(__ -> session.isOpen()) .flatMap(session::close); Mono<Void> proxyClose = session.closeStatus().filter(__ -> proxySession.isOpen()) .flatMap(proxySession::close); // Use retain() for Reactor Netty Mono<Void> proxySessionSend = proxySession .send(session.receive().doOnNext(WebSocketMessage::retain)); Mono<Void> serverSessionSend = session .send(proxySession.receive().doOnNext(WebSocketMessage::retain)); return Mono.zip(proxySessionSend, serverSessionSend, serverClose, proxyClose).then(); } @Override public List<String> getSubProtocols() { return CustomWebsocketRoutingFilter.ProxyWebSocketHandler.this.subProtocols; } }); } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
- 162
- 163
- 164
- 165
- 166
- 167
- 168
- 169
- 170
- 171
- 172
- 173
- 174
- 175
- 176
- 177
- 178
- 179
- 180
- 181
- 182
- 183
- 184
- 185
- 186
- 187
方法二
参考
package com.qf.gateway.filter;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.springframework.beans.factory.ObjectProvider;import org.springframework.cloud.gateway.filter.GatewayFilterChain;import org.springframework.cloud.gateway.filter.GlobalFilter;import org.springframework.cloud.gateway.filter.headers.HttpHeadersFilter;import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;import org.springframework.core.Ordered;import org.springframework.http.HttpHeaders;import org.springframework.stereotype.Component;import org.springframework.util.StringUtils;import org.springframework.web.reactive.socket.CloseStatus;import org.springframework.web.reactive.socket.WebSocketHandler;import org.springframework.web.reactive.socket.WebSocketMessage;import org.springframework.web.reactive.socket.WebSocketSession;import org.springframework.web.reactive.socket.client.WebSocketClient;import org.springframework.web.reactive.socket.server.WebSocketService;import org.springframework.web.server.ServerWebExchange;import org.springframework.web.util.UriComponentsBuilder;import reactor.core.publisher.Mono;import java.net.URI;import java.util.*;/** * 解决websocket关闭异常 问题 * @author admin * @Desc websocket客户端主动断开连接,网关服务报错1005 * @date 2022/8/20 14:30 */@Componentpublic class CustomWebsocketRoutingFilter implements GlobalFilter, Ordered { public static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol"; private static final Log log = LogFactory.getLog(CustomWebsocketRoutingFilter.class); private final WebSocketClient webSocketClient; private final WebSocketService webSocketService; private final ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider; private volatile List<HttpHeadersFilter> headersFilters; public CustomWebsocketRoutingFilter(WebSocketClient webSocketClient, WebSocketService webSocketService, ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider) { this.webSocketClient = webSocketClient; this.webSocketService = webSocketService; this.headersFiltersProvider = headersFiltersProvider; } static String convertHttpToWs(String scheme) { scheme = scheme.toLowerCase(); return "http".equals(scheme) ? "ws" : ("https".equals(scheme) ? "wss" : scheme); } @Override public int getOrder() { return 2147483645; } @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { changeSchemeIfIsWebSocketUpgrade(exchange); URI requestUrl = (URI)exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR); String scheme = requestUrl.getScheme(); if (!ServerWebExchangeUtils.isAlreadyRouted(exchange) && ("ws".equals(scheme) || "wss".equals(scheme))) { ServerWebExchangeUtils.setAlreadyRouted(exchange); HttpHeaders headers = exchange.getRequest().getHeaders(); HttpHeaders filtered = HttpHeadersFilter.filterRequest(this.getHeadersFilters(), exchange); List<String> protocols = this.getProtocols(headers); return this.webSocketService.handleRequest(exchange, new CustomWebsocketRoutingFilter.ProxyWebSocketHandler(requestUrl, this.webSocketClient, filtered, protocols)); } else { return chain.filter(exchange); } } List<String> getProtocols(HttpHeaders headers) { List<String> protocols = headers.get("Sec-WebSocket-Protocol"); if (protocols != null) { ArrayList<String> updatedProtocols = new ArrayList(); for(int i = 0; i < ((List)protocols).size(); ++i) { String protocol = (String)((List)protocols).get(i); updatedProtocols.addAll(Arrays.asList(StringUtils.tokenizeToStringArray(protocol, ","))); } protocols = updatedProtocols; } return (List)protocols; } List<HttpHeadersFilter> getHeadersFilters() { if (this.headersFilters == null) { this.headersFilters = (List)this.headersFiltersProvider.getIfAvailable(ArrayList::new); this.headersFilters.add((headers, exchange) -> { HttpHeaders filtered = new HttpHeaders(); filtered.addAll(headers); filtered.remove("Host"); boolean preserveHost = (Boolean)exchange.getAttributeOrDefault(ServerWebExchangeUtils.PRESERVE_HOST_HEADER_ATTRIBUTE, false); if (preserveHost) { String host = exchange.getRequest().getHeaders().getFirst("Host"); filtered.add("Host", host); } return filtered; }); this.headersFilters.add((headers, exchange) -> { HttpHeaders filtered = new HttpHeaders(); Iterator var3 = headers.entrySet().iterator(); while(var3.hasNext()) { Map.Entry<String, List<String>> entry = (Map.Entry)var3.next(); if (!((String)entry.getKey()).toLowerCase().startsWith("sec-websocket")) { filtered.addAll((String)entry.getKey(), (List)entry.getValue()); } } return filtered; }); } return this.headersFilters; } static void changeSchemeIfIsWebSocketUpgrade(ServerWebExchange exchange) { URI requestUrl = (URI)exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR); String scheme = requestUrl.getScheme().toLowerCase(); String upgrade = exchange.getRequest().getHeaders().getUpgrade(); if ("WebSocket".equalsIgnoreCase(upgrade) && ("http".equals(scheme) || "https".equals(scheme))) { String wsScheme = convertHttpToWs(scheme); boolean encoded = ServerWebExchangeUtils.containsEncodedParts(requestUrl); URI wsRequestUrl = UriComponentsBuilder.fromUri(requestUrl).scheme(wsScheme).build(encoded).toUri(); exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, wsRequestUrl); if (log.isTraceEnabled()) { log.trace("changeSchemeTo:[" + wsRequestUrl + "]"); } } } private static class ProxyWebSocketHandler implements WebSocketHandler { private final WebSocketClient client; private final URI url; private final HttpHeaders headers; private final List<String> subProtocols; ProxyWebSocketHandler(URI url, WebSocketClient client, HttpHeaders headers, List<String> protocols) { this.client = client; this.url = url; this.headers = headers; if (protocols != null) { this.subProtocols = protocols; } else { this.subProtocols = Collections.emptyList(); } } @Override public List<String> getSubProtocols() { return this.subProtocols; } @Override public Mono<Void> handle(WebSocketSession session) { return this.client.execute(this.url, this.headers, new WebSocketHandler() { private CloseStatus adaptCloseStatus(CloseStatus closeStatus) { int code = closeStatus.getCode(); if (code > 2999 && code < 5000) { return closeStatus; } switch (code) { case 1000: return closeStatus; case 1001: return closeStatus; case 1002: return closeStatus; case 1003: return closeStatus; case 1004: // Should not be used in a close frame // RESERVED; return CloseStatus.PROTOCOL_ERROR; case 1005: // Should not be used in a close frame // return CloseStatus.NO_STATUS_CODE; return CloseStatus.PROTOCOL_ERROR; case 1006: // Should not be used in a close frame // return CloseStatus.NO_CLOSE_FRAME; return CloseStatus.PROTOCOL_ERROR; case 1007: return closeStatus; case 1008: return closeStatus; case 1009: return closeStatus; case 1010: return closeStatus; case 1011: return closeStatus; case 1012: // Not in RFC6455 // return CloseStatus.SERVICE_RESTARTED; return CloseStatus.PROTOCOL_ERROR; case 1013: // Not in RFC6455 // return CloseStatus.SERVICE_OVERLOAD; return CloseStatus.PROTOCOL_ERROR; case 1015: // Should not be used in a close frame // return CloseStatus.TLS_HANDSHAKE_FAILURE; return CloseStatus.PROTOCOL_ERROR; default: return CloseStatus.PROTOCOL_ERROR; } } @Override public Mono<Void> handle(WebSocketSession proxySession) { Mono<Void> serverClose = proxySession.closeStatus().filter(__ -> session.isOpen()) .map(this::adaptCloseStatus) .flatMap(session::close); Mono<Void> proxyClose = session.closeStatus().filter(__ -> proxySession.isOpen()) .map(this::adaptCloseStatus) .flatMap(proxySession::close); // Use retain() for Reactor Netty Mono<Void> proxySessionSend = proxySession .send(session.receive().doOnNext(WebSocketMessage::retain)); Mono<Void> serverSessionSend = session .send(proxySession.receive().doOnNext(WebSocketMessage::retain)); // Ensure closeStatus from one propagates to the other Mono.when(serverClose, proxyClose).subscribe(); // Complete when both sessions are done return Mono.zip(proxySessionSend, serverSessionSend).then(); } @Override public List<String> getSubProtocols() { return CustomWebsocketRoutingFilter.ProxyWebSocketHandler.this.subProtocols; } }); } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
- 162
- 163
- 164
- 165
- 166
- 167
- 168
- 169
- 170
- 171
- 172
- 173
- 174
- 175
- 176
- 177
- 178
- 179
- 180
- 181
- 182
- 183
- 184
- 185
- 186
- 187
- 188
- 189
- 190
- 191
- 192
- 193
- 194
- 195
- 196
- 197
- 198
- 199
- 200
- 201
- 202
- 203
- 204
- 205
- 206
- 207
- 208
- 209
- 210
- 211
- 212
- 213
- 214
- 215
- 216
- 217
- 218
- 219
- 220
- 221
- 222
- 223
- 224
- 225
- 226
- 227
- 228
- 229
- 230
- 231
- 232
- 233
- 234
- 235
- 236
- 237
- 238
- 239
- 240
- 241
- 242
- 243
- 244
- 245
- 246
- 247