客户管理系统开发定制spring cloud gateway转发的websocket连接,客户端主动断开连接,网关服务报错1005

spring cloud gateway版本3.1.0

客户管理系统开发定制客户端通过spring cloud gateway转发到webSocket服务器,建立webSocket连接,客户管理系统开发定制当客户端断开连接后。网关会报如下错误信息:

Caused by: java.lang.IllegalArgumentException: WebSocket close status code does NOT comply with RFC-6455: 1005

这个报错实测,不会造成什么功能影响。但是报错信息的产生,可能会导致系统误判。

跟踪

由于是网关报错,所以在spring cloud gateway方面进行排查。

出现问题的代码 WebsocketRoutingFilter

首先发现一个关闭的issues,发现已经解决了该报错提示

该提交记录的代码显示已经解决了报错信息。

但是在正式版本中,实际上这个问题还未解决。
因为这个原因
I think serverClose and proxyClose don’t need to be included in, and probably should be separated from the zip because they are sort of competing with the Mono from each WebSocketHandler, and zip will cancel the other publishers after one of them completes.

持续跟踪…

问题的来源

问题在于org.springframework.cloud.gateway.filter.WebsocketRoutingFilter使用的方式ReactorNettyWebSocketSession

解决方案

自定义过滤器,在WebsocketRoutingFilter的基础上进行修改。

方法一

参考

package com.qf.gateway.filter;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.springframework.beans.factory.ObjectProvider;import org.springframework.cloud.gateway.filter.GatewayFilterChain;import org.springframework.cloud.gateway.filter.GlobalFilter;import org.springframework.cloud.gateway.filter.headers.HttpHeadersFilter;import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;import org.springframework.core.Ordered;import org.springframework.http.HttpHeaders;import org.springframework.stereotype.Component;import org.springframework.util.StringUtils;import org.springframework.web.reactive.socket.WebSocketHandler;import org.springframework.web.reactive.socket.WebSocketMessage;import org.springframework.web.reactive.socket.WebSocketSession;import org.springframework.web.reactive.socket.client.WebSocketClient;import org.springframework.web.reactive.socket.server.WebSocketService;import org.springframework.web.server.ServerWebExchange;import org.springframework.web.util.UriComponentsBuilder;import reactor.core.publisher.Mono;import java.net.URI;import java.util.*;/** * 解决websocket关闭异常 问题 * @author admin * @Desc websocket客户端主动断开连接,网关服务报错1005 * @date 2022/8/24 14:30 */@Componentpublic class CustomWebsocketRoutingFilter implements GlobalFilter, Ordered {    public static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";    private static final Log log = LogFactory.getLog(CustomWebsocketRoutingFilter.class);    private final WebSocketClient webSocketClient;    private final WebSocketService webSocketService;    private final ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider;    private volatile List<HttpHeadersFilter> headersFilters;    public CustomWebsocketRoutingFilter(WebSocketClient webSocketClient, WebSocketService webSocketService, ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider) {        this.webSocketClient = webSocketClient;        this.webSocketService = webSocketService;        this.headersFiltersProvider = headersFiltersProvider;    }    static String convertHttpToWs(String scheme) {        scheme = scheme.toLowerCase();        return "http".equals(scheme) ? "ws" : ("https".equals(scheme) ? "wss" : scheme);    }    @Override    public int getOrder() {        return 2147483645;    }    @Override    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {        changeSchemeIfIsWebSocketUpgrade(exchange);        URI requestUrl = (URI)exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);        String scheme = requestUrl.getScheme();        if (!ServerWebExchangeUtils.isAlreadyRouted(exchange) && ("ws".equals(scheme) || "wss".equals(scheme))) {            ServerWebExchangeUtils.setAlreadyRouted(exchange);            HttpHeaders headers = exchange.getRequest().getHeaders();            HttpHeaders filtered = HttpHeadersFilter.filterRequest(this.getHeadersFilters(), exchange);            List<String> protocols = this.getProtocols(headers);            return this.webSocketService.handleRequest(exchange, new CustomWebsocketRoutingFilter.ProxyWebSocketHandler(requestUrl, this.webSocketClient, filtered, protocols));        } else {            return chain.filter(exchange);        }    }    List<String> getProtocols(HttpHeaders headers) {        List<String> protocols = headers.get("Sec-WebSocket-Protocol");        if (protocols != null) {            ArrayList<String> updatedProtocols = new ArrayList();            for(int i = 0; i < ((List)protocols).size(); ++i) {                String protocol = (String)((List)protocols).get(i);                updatedProtocols.addAll(Arrays.asList(StringUtils.tokenizeToStringArray(protocol, ",")));            }            protocols = updatedProtocols;        }        return (List)protocols;    }    List<HttpHeadersFilter> getHeadersFilters() {        if (this.headersFilters == null) {            this.headersFilters = (List)this.headersFiltersProvider.getIfAvailable(ArrayList::new);            this.headersFilters.add((headers, exchange) -> {                HttpHeaders filtered = new HttpHeaders();                filtered.addAll(headers);                filtered.remove("Host");                boolean preserveHost = (Boolean)exchange.getAttributeOrDefault(ServerWebExchangeUtils.PRESERVE_HOST_HEADER_ATTRIBUTE, false);                if (preserveHost) {                    String host = exchange.getRequest().getHeaders().getFirst("Host");                    filtered.add("Host", host);                }                return filtered;            });            this.headersFilters.add((headers, exchange) -> {                HttpHeaders filtered = new HttpHeaders();                Iterator var3 = headers.entrySet().iterator();                while(var3.hasNext()) {                    Map.Entry<String, List<String>> entry = (Map.Entry)var3.next();                    if (!((String)entry.getKey()).toLowerCase().startsWith("sec-websocket")) {                        filtered.addAll((String)entry.getKey(), (List)entry.getValue());                    }                }                return filtered;            });        }        return this.headersFilters;    }    static void changeSchemeIfIsWebSocketUpgrade(ServerWebExchange exchange) {        URI requestUrl = (URI)exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);        String scheme = requestUrl.getScheme().toLowerCase();        String upgrade = exchange.getRequest().getHeaders().getUpgrade();        if ("WebSocket".equalsIgnoreCase(upgrade) && ("http".equals(scheme) || "https".equals(scheme))) {            String wsScheme = convertHttpToWs(scheme);            boolean encoded = ServerWebExchangeUtils.containsEncodedParts(requestUrl);            URI wsRequestUrl = UriComponentsBuilder.fromUri(requestUrl).scheme(wsScheme).build(encoded).toUri();            exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, wsRequestUrl);            if (log.isTraceEnabled()) {                log.trace("changeSchemeTo:[" + wsRequestUrl + "]");            }        }    }    private static class ProxyWebSocketHandler implements WebSocketHandler {        private final WebSocketClient client;        private final URI url;        private final HttpHeaders headers;        private final List<String> subProtocols;        ProxyWebSocketHandler(URI url, WebSocketClient client, HttpHeaders headers, List<String> protocols) {            this.client = client;            this.url = url;            this.headers = headers;            if (protocols != null) {                this.subProtocols = protocols;            } else {                this.subProtocols = Collections.emptyList();            }        }        @Override        public List<String> getSubProtocols() {            return this.subProtocols;        }        @Override        public Mono<Void> handle(WebSocketSession session) {            return this.client.execute(this.url, this.headers, new WebSocketHandler() {                @Override                public Mono<Void> handle(WebSocketSession proxySession) {                    Mono<Void> serverClose = proxySession.closeStatus().filter(__ -> session.isOpen())                            .flatMap(session::close);                    Mono<Void> proxyClose = session.closeStatus().filter(__ -> proxySession.isOpen())                            .flatMap(proxySession::close);                    // Use retain() for Reactor Netty                    Mono<Void> proxySessionSend = proxySession                            .send(session.receive().doOnNext(WebSocketMessage::retain));                    Mono<Void> serverSessionSend = session                            .send(proxySession.receive().doOnNext(WebSocketMessage::retain));                    return Mono.zip(proxySessionSend, serverSessionSend, serverClose, proxyClose).then();                }                @Override                public List<String> getSubProtocols() {                    return CustomWebsocketRoutingFilter.ProxyWebSocketHandler.this.subProtocols;                }            });        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187

方法二

参考

package com.qf.gateway.filter;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.springframework.beans.factory.ObjectProvider;import org.springframework.cloud.gateway.filter.GatewayFilterChain;import org.springframework.cloud.gateway.filter.GlobalFilter;import org.springframework.cloud.gateway.filter.headers.HttpHeadersFilter;import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;import org.springframework.core.Ordered;import org.springframework.http.HttpHeaders;import org.springframework.stereotype.Component;import org.springframework.util.StringUtils;import org.springframework.web.reactive.socket.CloseStatus;import org.springframework.web.reactive.socket.WebSocketHandler;import org.springframework.web.reactive.socket.WebSocketMessage;import org.springframework.web.reactive.socket.WebSocketSession;import org.springframework.web.reactive.socket.client.WebSocketClient;import org.springframework.web.reactive.socket.server.WebSocketService;import org.springframework.web.server.ServerWebExchange;import org.springframework.web.util.UriComponentsBuilder;import reactor.core.publisher.Mono;import java.net.URI;import java.util.*;/** * 解决websocket关闭异常 问题 * @author admin * @Desc websocket客户端主动断开连接,网关服务报错1005 * @date 2022/8/20 14:30 */@Componentpublic class CustomWebsocketRoutingFilter implements GlobalFilter, Ordered {    public static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";    private static final Log log = LogFactory.getLog(CustomWebsocketRoutingFilter.class);    private final WebSocketClient webSocketClient;    private final WebSocketService webSocketService;    private final ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider;    private volatile List<HttpHeadersFilter> headersFilters;    public CustomWebsocketRoutingFilter(WebSocketClient webSocketClient, WebSocketService webSocketService, ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider) {        this.webSocketClient = webSocketClient;        this.webSocketService = webSocketService;        this.headersFiltersProvider = headersFiltersProvider;    }    static String convertHttpToWs(String scheme) {        scheme = scheme.toLowerCase();        return "http".equals(scheme) ? "ws" : ("https".equals(scheme) ? "wss" : scheme);    }    @Override    public int getOrder() {        return 2147483645;    }    @Override    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {        changeSchemeIfIsWebSocketUpgrade(exchange);        URI requestUrl = (URI)exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);        String scheme = requestUrl.getScheme();        if (!ServerWebExchangeUtils.isAlreadyRouted(exchange) && ("ws".equals(scheme) || "wss".equals(scheme))) {            ServerWebExchangeUtils.setAlreadyRouted(exchange);            HttpHeaders headers = exchange.getRequest().getHeaders();            HttpHeaders filtered = HttpHeadersFilter.filterRequest(this.getHeadersFilters(), exchange);            List<String> protocols = this.getProtocols(headers);            return this.webSocketService.handleRequest(exchange, new CustomWebsocketRoutingFilter.ProxyWebSocketHandler(requestUrl, this.webSocketClient, filtered, protocols));        } else {            return chain.filter(exchange);        }    }    List<String> getProtocols(HttpHeaders headers) {        List<String> protocols = headers.get("Sec-WebSocket-Protocol");        if (protocols != null) {            ArrayList<String> updatedProtocols = new ArrayList();            for(int i = 0; i < ((List)protocols).size(); ++i) {                String protocol = (String)((List)protocols).get(i);                updatedProtocols.addAll(Arrays.asList(StringUtils.tokenizeToStringArray(protocol, ",")));            }            protocols = updatedProtocols;        }        return (List)protocols;    }    List<HttpHeadersFilter> getHeadersFilters() {        if (this.headersFilters == null) {            this.headersFilters = (List)this.headersFiltersProvider.getIfAvailable(ArrayList::new);            this.headersFilters.add((headers, exchange) -> {                HttpHeaders filtered = new HttpHeaders();                filtered.addAll(headers);                filtered.remove("Host");                boolean preserveHost = (Boolean)exchange.getAttributeOrDefault(ServerWebExchangeUtils.PRESERVE_HOST_HEADER_ATTRIBUTE, false);                if (preserveHost) {                    String host = exchange.getRequest().getHeaders().getFirst("Host");                    filtered.add("Host", host);                }                return filtered;            });            this.headersFilters.add((headers, exchange) -> {                HttpHeaders filtered = new HttpHeaders();                Iterator var3 = headers.entrySet().iterator();                while(var3.hasNext()) {                    Map.Entry<String, List<String>> entry = (Map.Entry)var3.next();                    if (!((String)entry.getKey()).toLowerCase().startsWith("sec-websocket")) {                        filtered.addAll((String)entry.getKey(), (List)entry.getValue());                    }                }                return filtered;            });        }        return this.headersFilters;    }    static void changeSchemeIfIsWebSocketUpgrade(ServerWebExchange exchange) {        URI requestUrl = (URI)exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);        String scheme = requestUrl.getScheme().toLowerCase();        String upgrade = exchange.getRequest().getHeaders().getUpgrade();        if ("WebSocket".equalsIgnoreCase(upgrade) && ("http".equals(scheme) || "https".equals(scheme))) {            String wsScheme = convertHttpToWs(scheme);            boolean encoded = ServerWebExchangeUtils.containsEncodedParts(requestUrl);            URI wsRequestUrl = UriComponentsBuilder.fromUri(requestUrl).scheme(wsScheme).build(encoded).toUri();            exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, wsRequestUrl);            if (log.isTraceEnabled()) {                log.trace("changeSchemeTo:[" + wsRequestUrl + "]");            }        }    }    private static class ProxyWebSocketHandler implements WebSocketHandler {        private final WebSocketClient client;        private final URI url;        private final HttpHeaders headers;        private final List<String> subProtocols;        ProxyWebSocketHandler(URI url, WebSocketClient client, HttpHeaders headers, List<String> protocols) {            this.client = client;            this.url = url;            this.headers = headers;            if (protocols != null) {                this.subProtocols = protocols;            } else {                this.subProtocols = Collections.emptyList();            }        }        @Override        public List<String> getSubProtocols() {            return this.subProtocols;        }        @Override        public Mono<Void> handle(WebSocketSession session) {            return this.client.execute(this.url, this.headers, new WebSocketHandler() {                private CloseStatus adaptCloseStatus(CloseStatus closeStatus) {                    int code = closeStatus.getCode();                    if (code > 2999 && code < 5000) {                        return closeStatus;                    }                    switch (code) {                        case 1000:                            return closeStatus;                        case 1001:                            return closeStatus;                        case 1002:                            return closeStatus;                        case 1003:                            return closeStatus;                        case 1004:                            // Should not be used in a close frame                            // RESERVED;                            return CloseStatus.PROTOCOL_ERROR;                        case 1005:                            // Should not be used in a close frame                            // return CloseStatus.NO_STATUS_CODE;                            return CloseStatus.PROTOCOL_ERROR;                        case 1006:                            // Should not be used in a close frame                            // return CloseStatus.NO_CLOSE_FRAME;                            return CloseStatus.PROTOCOL_ERROR;                        case 1007:                            return closeStatus;                        case 1008:                            return closeStatus;                        case 1009:                            return closeStatus;                        case 1010:                            return closeStatus;                        case 1011:                            return closeStatus;                        case 1012:                            // Not in RFC6455                            // return CloseStatus.SERVICE_RESTARTED;                            return CloseStatus.PROTOCOL_ERROR;                        case 1013:                            // Not in RFC6455                            // return CloseStatus.SERVICE_OVERLOAD;                            return CloseStatus.PROTOCOL_ERROR;                        case 1015:                            // Should not be used in a close frame                            // return CloseStatus.TLS_HANDSHAKE_FAILURE;                            return CloseStatus.PROTOCOL_ERROR;                        default:                            return CloseStatus.PROTOCOL_ERROR;                    }                }                @Override                public Mono<Void> handle(WebSocketSession proxySession) {                    Mono<Void> serverClose = proxySession.closeStatus().filter(__ -> session.isOpen())                            .map(this::adaptCloseStatus)                            .flatMap(session::close);                    Mono<Void> proxyClose = session.closeStatus().filter(__ -> proxySession.isOpen())                            .map(this::adaptCloseStatus)                            .flatMap(proxySession::close);                    // Use retain() for Reactor Netty                    Mono<Void> proxySessionSend = proxySession                            .send(session.receive().doOnNext(WebSocketMessage::retain));                    Mono<Void> serverSessionSend = session                            .send(proxySession.receive().doOnNext(WebSocketMessage::retain));                    // Ensure closeStatus from one propagates to the other                    Mono.when(serverClose, proxyClose).subscribe();                    // Complete when both sessions are done                    return Mono.zip(proxySessionSend, serverSessionSend).then();                }                @Override                public List<String> getSubProtocols() {                    return CustomWebsocketRoutingFilter.ProxyWebSocketHandler.this.subProtocols;                }            });        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发