处理后端与Android之间WebSocket连接经常断开的情况

nginx配置ws转发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
location ~ /(mq|ws)/ {
proxy_redirect off;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_read_timeout 600s;
proxy_send_timeout 600s;

proxy_pass http://mq-service;
}

添加头部信息,这两个字段表示请求服务器升级协议为WebSocket:

1
2
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;

默认情况下,连接将会在无数据传输60秒后关闭,proxy_read_timeout参数可以延长这个时间。源站通过定期发送ping帧以保持连接并确认连接是否还在使用。

  • proxy_read_timeout

    该指令设置与代理服务器的读超时时间。它决定了nginx会等待多长时间来获得请求的响应。 这个时间不是获得整个response的时间,而是两次reading操作的时间。

  • proxy_send_timeout

    这个指定设置了发送请求给upstream服务器的超时时间。超时设置不是为了整个发送期间,而是在两次write操作期间。 如果超时后,upstream没有收到新的数据,nginx会关闭连接

Android/微信小程序心跳机制

  1. 定时发送心跳包。如果发送失败,就进行重连。
  2. 一些关键的操作,可以在重连后,根据实际情况,立刻进行调用

参考:
https://www.xncoding.com/2018/03/12/fullstack/nginx-websocket.html

WebSocket多实例部署时的一种解决方案

需要用到k8s进行扩展,在变更容器数量的时候,希望达到不改动代码。

遇到的问题

  1. Client与哪一个WS服务建立连接是不知道的
  2. 当需要发送WS消息时,使用URL发送给所有的WS模块不可取(一旦容器数量改变,还需要修改代码,即增加新的URL)

架构图

代码

建立连接部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@ServerEndpoint(value = "/ws/{role}/{token}", configurator = EndpointConf.class)
@Component
@Slf4j
public class WsController {

private static final String PARAM_TOKEN = "token";
private static final String PARAM_ROLE = "role";
private static final Set<String> ROLE_SET = new HashSet<>(
Arrays.asList(AccountType.DRIVER.name().toLowerCase(), AccountType.PASSENGER.name().toLowerCase())
);

@Autowired
private WsService wsService;

@OnOpen
public void onOpen(@PathParam(PARAM_ROLE) String role,
@PathParam(PARAM_TOKEN) String token, Session session) throws IOException {
if (!ROLE_SET.contains(role)) {
// 登陆类型不正确
log.warn("token:{} login role error, role:{}", token, role);
wsService.sendMessage(session, wsService.authFailMsg());
session.close();
return;
}

int userId = wsService.getUserIdByToken(role, token);
if (userId == -1) {
// 根据token找不到userId
log.warn("token:{} login error, you are offline", token);
wsService.sessionMap.remove(token);
wsService.sendMessage(session, wsService.authFailMsg());
session.close();
return;
}
log.info("【{}】, token : {} open websocket connect", wsService.showInfoAboutToken(token), token);

// 删除此token已有session
Session oldSession = wsService.sessionMap.get(token);
if (oldSession != null) {
wsService.sessionMap.remove(token);
wsService.sendMessage(oldSession, wsService.duplicateLoginMsg());
oldSession.close();
}
wsService.sessionMap.put(token, session);
}

@OnClose
public void onClose(@PathParam(PARAM_ROLE) String role,
@PathParam(PARAM_TOKEN) String token, Session session) {
log.info("close connection. 【{}】, token: {}", wsService.showInfoAboutToken(token), token);
wsService.sessionMap.remove(token);
wsService.sendMessage(session, wsService.authFailMsg());
}

@OnError
public void onError(@PathParam(PARAM_ROLE) String role,
@PathParam(PARAM_TOKEN) String token, Session session, Throwable error) {
log.error("【{}】, token : {}, sessionId: {}, websocket error: {}", wsService.showInfoAboutToken(token), token, session.getId(), error);
}

@OnMessage
public void onMessage(@PathParam(PARAM_ROLE) String role,
@PathParam(PARAM_TOKEN) String token, String message, Session session) throws IOException {
log.info("receive from 【{}】, token : {}, message: {}",wsService.showInfoAboutToken(token), token, message);
if (!ROLE_SET.contains(role)) {
// 登陆类型不正确
wsService.sendMessage(session, wsService.authFailMsg());
session.close();
}
//司机心跳缓存
if(role.equals(AccountType.DRIVER.name().toLowerCase())){
wsService.updateHeartBeat(token);
}
wsService.actionHandle(session, message);
}
}

接收各个模块发送WS的MQ消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
public class WsMqMsgListener implements MessageListener {

@Autowired
private WsService wsService;

@Override
public Action consume(Message message, ConsumeContext context) {
log.info("receive tag:{}, body:{}", message.getTag(), new String(message.getBody()));
try {
//消息体执行内容
String bodyStr = new String(message.getBody());

if (StringUtils.isEmpty(bodyStr))
return Action.CommitMessage;

JSONObject body = JSONObject.parseObject(bodyStr);
log.info("got a websocket mq msg");
WebSocketMqMsg.Body wsMqBody = JSON.toJavaObject(body, WebSocketMqMsg.Body.class);
wsService.sendMessage(wsMqBody);

return Action.CommitMessage;
} catch (Exception e) {
e.printStackTrace();
log.error("消费MQ消息失败,原因是:{}", e.getMessage());
//消费失败
return Action.ReconsumeLater;
}
}
}

收到各个模块的MQ消息后,提取出发送对象、发送内容,然后进行发送。如果没有找到对应客户端的连接,那么将抛弃掉该WS消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public Map<String, Session> sessionMap = new ConcurrentHashMap<>();
public void sendMessage(WebSocketMqMsg.Body message) {
if (message.getIds().size() > 0) {
for (Integer id : message.getIds()) {
cachedThreadPool.execute(() -> {
int maxIdx = message.getRole().equals(AccountType.PASSENGER.name()) ? 2 : 1;
for (int i = 1; i <= maxIdx; i++) {

String key = String.format(Constants.CACHE_USER_TOKEN_LOGIN_PREFIX,
message.getRole(),
LoginType.valueOf((short) i), id);
log.info("key is {}", key);

String token = cacheService.getVal(key);
log.info("token is {}", token == null ? "null" : token);

boolean sendOfflineMsg = false;
if (!StringUtils.isEmpty(token)) {
Session session = sessionMap.get(token);
log.info("session is {}", session == null ? "null" : "not null");
if (session == null || !sendMessage(session,
message.getMsg().toString())) {
sendOfflineMsg = true;
}
} else {
sendOfflineMsg = true;
}
log.info("ws msg: role -> 【{}】, id -> 【{}】, terminal -> 【{}】, status -> 【{}】",
message.getRole(),
id,
i == 1 ? LoginType.APP.name() : LoginType.WECHAT_APPLET.name(),
sendOfflineMsg ? "offline" : "online");
}
});
}
}
}

public boolean sendMessage(Session session, String message) {
session.getBasicRemote().sendPong();
if (!session.isOpen()) {
return false;
}
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("send message to {} error {}", session.getId(), e);
return false;
}
return true;
}

后记

按照上述架构完成的多实例WS服务部署,可以解决前面提到的两个问题。MQ作为一个中间这的角色,发挥出了它的作用。