WebSocket多实例部署时的一种解决方案

需要用到k8s进行扩展,在变更容器数量的时候,希望达到不改动代码。

遇到的问题

  1. Client与哪一个WS服务建立连接是不知道的
  2. 当需要发送WS消息时,使用URL发送给所有的WS模块不可取(一旦容器数量改变,还需要修改代码,即增加新的URL)

架构图

代码

建立连接部分

@ServerEndpoint(value = "/ws/{role}/{token}", configurator = EndpointConf.class)
@Component
@Slf4j
public class WsController {

    private static final String PARAM_TOKEN = "token";
    private static final String PARAM_ROLE = "role";
    private static final Set<String> ROLE_SET = new HashSet<>(
            Arrays.asList(AccountType.DRIVER.name().toLowerCase(), AccountType.PASSENGER.name().toLowerCase())
    );

    @Autowired
    private WsService wsService;

    @OnOpen
    public void onOpen(@PathParam(PARAM_ROLE) String role,
                       @PathParam(PARAM_TOKEN) String token, Session session) throws IOException {
        if (!ROLE_SET.contains(role)) {
            // 登陆类型不正确
            log.warn("token:{} login role error, role:{}", token, role);
            wsService.sendMessage(session, wsService.authFailMsg());
            session.close();
            return;
        }

        int userId = wsService.getUserIdByToken(role, token);
        if (userId == -1) {
            // 根据token找不到userId
            log.warn("token:{} login error, you are offline", token);
            wsService.sessionMap.remove(token);
            wsService.sendMessage(session, wsService.authFailMsg());
            session.close();
            return;
        }
        log.info("【{}】, token : {} open websocket connect", wsService.showInfoAboutToken(token), token);

        // 删除此token已有session
        Session oldSession = wsService.sessionMap.get(token);
        if (oldSession != null) {
            wsService.sessionMap.remove(token);
            wsService.sendMessage(oldSession, wsService.duplicateLoginMsg());
            oldSession.close();
        }
        wsService.sessionMap.put(token, session);
    }

    @OnClose
    public void onClose(@PathParam(PARAM_ROLE) String role,
                        @PathParam(PARAM_TOKEN) String token, Session session) {
        log.info("close connection. 【{}】, token: {}", wsService.showInfoAboutToken(token), token);
        wsService.sessionMap.remove(token);
        wsService.sendMessage(session, wsService.authFailMsg());
    }

    @OnError
    public void onError(@PathParam(PARAM_ROLE) String role,
                        @PathParam(PARAM_TOKEN) String token, Session session, Throwable error) {
        log.error("【{}】, token : {}, sessionId: {}, websocket error: {}", wsService.showInfoAboutToken(token), token, session.getId(), error);
    }

    @OnMessage
    public void onMessage(@PathParam(PARAM_ROLE) String role,
                          @PathParam(PARAM_TOKEN) String token, String message, Session session) throws IOException {
        log.info("receive from 【{}】, token : {}, message: {}",wsService.showInfoAboutToken(token), token, message);
        if (!ROLE_SET.contains(role)) {
            // 登陆类型不正确
            wsService.sendMessage(session, wsService.authFailMsg());
            session.close();
        }
        //司机心跳缓存
        if(role.equals(AccountType.DRIVER.name().toLowerCase())){
            wsService.updateHeartBeat(token);
        }
        wsService.actionHandle(session, message);
    }
}

接收各个模块发送WS的MQ消息

@Slf4j
public class WsMqMsgListener implements MessageListener {

    @Autowired
    private WsService wsService;

    @Override
    public Action consume(Message message, ConsumeContext context) {
        log.info("receive tag:{}, body:{}", message.getTag(), new String(message.getBody()));
        try {
            //消息体执行内容
            String bodyStr = new String(message.getBody());

            if (StringUtils.isEmpty(bodyStr))
                return Action.CommitMessage;

            JSONObject body = JSONObject.parseObject(bodyStr);
            log.info("got a websocket mq msg");
            WebSocketMqMsg.Body wsMqBody = JSON.toJavaObject(body, WebSocketMqMsg.Body.class);
            wsService.sendMessage(wsMqBody);

            return Action.CommitMessage;
        } catch (Exception e) {
            e.printStackTrace();
            log.error("消费MQ消息失败,原因是:{}", e.getMessage());
            //消费失败
            return Action.ReconsumeLater;
        }
    }
}

收到各个模块的MQ消息后,提取出发送对象、发送内容,然后进行发送。如果没有找到对应客户端的连接,那么将抛弃掉该WS消息。

public Map<String, Session> sessionMap = new ConcurrentHashMap<>();
public void sendMessage(WebSocketMqMsg.Body message) {
    if (message.getIds().size() > 0) {
        for (Integer id : message.getIds()) {
            cachedThreadPool.execute(() -> {
                int maxIdx = message.getRole().equals(AccountType.PASSENGER.name()) ? 2 : 1;
                for (int i = 1; i <= maxIdx; i++) {

                    String key = String.format(Constants.CACHE_USER_TOKEN_LOGIN_PREFIX,
                            message.getRole(),
                            LoginType.valueOf((short) i), id);
                    log.info("key is {}", key);

                    String token = cacheService.getVal(key);
                    log.info("token is {}", token == null ? "null" : token);

                    boolean sendOfflineMsg = false;
                    if (!StringUtils.isEmpty(token)) {
                        Session session = sessionMap.get(token);
                        log.info("session is {}", session == null ? "null" : "not null");
                        if (session == null || !sendMessage(session,
                                message.getMsg().toString())) {
                            sendOfflineMsg = true;
                        }
                    } else {
                        sendOfflineMsg = true;
                    }
                    log.info("ws msg: role -> 【{}】, id -> 【{}】, terminal -> 【{}】, status -> 【{}】",
                            message.getRole(),
                            id,
                            i == 1 ? LoginType.APP.name() : LoginType.WECHAT_APPLET.name(),
                            sendOfflineMsg ? "offline" : "online");
                }
            });
        }
    }
}

public boolean sendMessage(Session session, String message) {
    session.getBasicRemote().sendPong();
    if (!session.isOpen()) {
        return false;
    }
    try {
        session.getBasicRemote().sendText(message);
    } catch (IOException e) {
        log.error("send message to {} error {}", session.getId(), e);
        return false;
    }
    return true;
}

后记

按照上述架构完成的多实例WS服务部署,可以解决前面提到的两个问题。MQ作为一个中间这的角色,发挥出了它的作用。

Read more

Volcano 与 Kubernetes GPU 调度学习笔记

本笔记系统整理 Volcano 调度器、Kubernetes 调度框架、GPU Device Plugin、HAMi 等云原生 AI 调度领域的核心知识,适合用于学习、复习和工程实践参考。 目录 * 第一部分:Volcano 入门 * 1. Volcano 是什么 * 2. 安装与快速使用 * 3. 核心特性一览 * 第二部分:Volcano 整体架构 * 4. Volcano 解决的核心问题 * 5. 整体架构与数据流 * 6. 三层抽象模型 * 第三部分:Volcano 核心实现原理 * 7. Session 机制 * 8. Gang Scheduling 实现 * 9. Queue 与 DRF 公平调度

容器镜像(4):镜像的常用工具箱

容器镜像(4):镜像的常用工具箱

前几篇在讲多架构镜像时已经用过 skopeo 和 crane 做镜像复制,这篇系统整理这两个工具的完整能力,同时介绍几个日常操作镜像时同样好用的工具。 一、skopeo:不依赖 Daemon 的镜像瑞士军刀 skopeo 的核心价值是绕过 Docker daemon,直接与 Registry API 交互。上一篇用它做镜像复制和离线传输,但它的能力远不止于此。 1.1 安装 # Ubuntu / Debian sudo apt install -y skopeo skopeo --version # skopeo version 1.15.1 1.2 inspect:免拉取检查镜像元数据 docker inspect 需要先把镜像拉到本地,skopeo inspect 直接向 Registry

容器镜像(3):多架构镜像构建

容器镜像(3):多架构镜像构建

一、什么是多架构镜像 1.1 OCI Image Index 上一篇介绍了单平台镜像的结构:一个 Manifest 指向 Config 和若干 Layer blob。多架构镜像在此之上多了一层——OCI Image Index(也叫 Manifest List),是一个轻量的索引文件,把多个单平台 Manifest 组织在一起: $ docker manifest inspect golang:1.22-alpine { "schemaVersion": 2, "mediaType": "application/vnd.oci.image.index.v1+json", "manifests&

容器镜像(2):containerd 视角下的镜像

容器镜像(2):containerd 视角下的镜像

一、为什么需要了解 containerd 如果你只用 docker run 跑容器,从来不关心底层,那可以不了解 containerd。但如果你在用 Kubernetes,或者想真正理解"容器运行时"是什么,containerd 是绕不开的。 事实上,当你执行 docker run 的时候,containerd 早就在后台悄悄工作了——Docker 从 1.11 版本开始,就把核心运行时剥离出来交给 containerd 负责。 1.1 Docker 的架构演变 早期的 Docker(1.10 及之前)是一个"大一统"的单体程序:一个 dockerd