WebSocket多实例部署时的一种解决方案

需要用到k8s进行扩展,在变更容器数量的时候,希望达到不改动代码。

遇到的问题

  1. Client与哪一个WS服务建立连接是不知道的
  2. 当需要发送WS消息时,使用URL发送给所有的WS模块不可取(一旦容器数量改变,还需要修改代码,即增加新的URL)

架构图

代码

建立连接部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@ServerEndpoint(value = "/ws/{role}/{token}", configurator = EndpointConf.class)
@Component
@Slf4j
public class WsController {

private static final String PARAM_TOKEN = "token";
private static final String PARAM_ROLE = "role";
private static final Set<String> ROLE_SET = new HashSet<>(
Arrays.asList(AccountType.DRIVER.name().toLowerCase(), AccountType.PASSENGER.name().toLowerCase())
);

@Autowired
private WsService wsService;

@OnOpen
public void onOpen(@PathParam(PARAM_ROLE) String role,
@PathParam(PARAM_TOKEN) String token, Session session) throws IOException {
if (!ROLE_SET.contains(role)) {
// 登陆类型不正确
log.warn("token:{} login role error, role:{}", token, role);
wsService.sendMessage(session, wsService.authFailMsg());
session.close();
return;
}

int userId = wsService.getUserIdByToken(role, token);
if (userId == -1) {
// 根据token找不到userId
log.warn("token:{} login error, you are offline", token);
wsService.sessionMap.remove(token);
wsService.sendMessage(session, wsService.authFailMsg());
session.close();
return;
}
log.info("【{}】, token : {} open websocket connect", wsService.showInfoAboutToken(token), token);

// 删除此token已有session
Session oldSession = wsService.sessionMap.get(token);
if (oldSession != null) {
wsService.sessionMap.remove(token);
wsService.sendMessage(oldSession, wsService.duplicateLoginMsg());
oldSession.close();
}
wsService.sessionMap.put(token, session);
}

@OnClose
public void onClose(@PathParam(PARAM_ROLE) String role,
@PathParam(PARAM_TOKEN) String token, Session session) {
log.info("close connection. 【{}】, token: {}", wsService.showInfoAboutToken(token), token);
wsService.sessionMap.remove(token);
wsService.sendMessage(session, wsService.authFailMsg());
}

@OnError
public void onError(@PathParam(PARAM_ROLE) String role,
@PathParam(PARAM_TOKEN) String token, Session session, Throwable error) {
log.error("【{}】, token : {}, sessionId: {}, websocket error: {}", wsService.showInfoAboutToken(token), token, session.getId(), error);
}

@OnMessage
public void onMessage(@PathParam(PARAM_ROLE) String role,
@PathParam(PARAM_TOKEN) String token, String message, Session session) throws IOException {
log.info("receive from 【{}】, token : {}, message: {}",wsService.showInfoAboutToken(token), token, message);
if (!ROLE_SET.contains(role)) {
// 登陆类型不正确
wsService.sendMessage(session, wsService.authFailMsg());
session.close();
}
//司机心跳缓存
if(role.equals(AccountType.DRIVER.name().toLowerCase())){
wsService.updateHeartBeat(token);
}
wsService.actionHandle(session, message);
}
}

接收各个模块发送WS的MQ消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
public class WsMqMsgListener implements MessageListener {

@Autowired
private WsService wsService;

@Override
public Action consume(Message message, ConsumeContext context) {
log.info("receive tag:{}, body:{}", message.getTag(), new String(message.getBody()));
try {
//消息体执行内容
String bodyStr = new String(message.getBody());

if (StringUtils.isEmpty(bodyStr))
return Action.CommitMessage;

JSONObject body = JSONObject.parseObject(bodyStr);
log.info("got a websocket mq msg");
WebSocketMqMsg.Body wsMqBody = JSON.toJavaObject(body, WebSocketMqMsg.Body.class);
wsService.sendMessage(wsMqBody);

return Action.CommitMessage;
} catch (Exception e) {
e.printStackTrace();
log.error("消费MQ消息失败,原因是:{}", e.getMessage());
//消费失败
return Action.ReconsumeLater;
}
}
}

收到各个模块的MQ消息后,提取出发送对象、发送内容,然后进行发送。如果没有找到对应客户端的连接,那么将抛弃掉该WS消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public Map<String, Session> sessionMap = new ConcurrentHashMap<>();
public void sendMessage(WebSocketMqMsg.Body message) {
if (message.getIds().size() > 0) {
for (Integer id : message.getIds()) {
cachedThreadPool.execute(() -> {
int maxIdx = message.getRole().equals(AccountType.PASSENGER.name()) ? 2 : 1;
for (int i = 1; i <= maxIdx; i++) {

String key = String.format(Constants.CACHE_USER_TOKEN_LOGIN_PREFIX,
message.getRole(),
LoginType.valueOf((short) i), id);
log.info("key is {}", key);

String token = cacheService.getVal(key);
log.info("token is {}", token == null ? "null" : token);

boolean sendOfflineMsg = false;
if (!StringUtils.isEmpty(token)) {
Session session = sessionMap.get(token);
log.info("session is {}", session == null ? "null" : "not null");
if (session == null || !sendMessage(session,
message.getMsg().toString())) {
sendOfflineMsg = true;
}
} else {
sendOfflineMsg = true;
}
log.info("ws msg: role -> 【{}】, id -> 【{}】, terminal -> 【{}】, status -> 【{}】",
message.getRole(),
id,
i == 1 ? LoginType.APP.name() : LoginType.WECHAT_APPLET.name(),
sendOfflineMsg ? "offline" : "online");
}
});
}
}
}

public boolean sendMessage(Session session, String message) {
session.getBasicRemote().sendPong();
if (!session.isOpen()) {
return false;
}
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("send message to {} error {}", session.getId(), e);
return false;
}
return true;
}

后记

按照上述架构完成的多实例WS服务部署,可以解决前面提到的两个问题。MQ作为一个中间这的角色,发挥出了它的作用。

WebSocket多实例部署时的一种解决方案

https://eucham.me/2019/08/20/f51277d4e8ec.html

作者

遇寻

发布于

2019-08-20

更新于

2021-02-09

许可协议

评论