一种基于redis的分布式锁的实现(当前项目中在使用)

主要思想是利用redis执行命令时的单线程特性。

/**
 * 分布式锁
 */
@Component
public class DistributedLock {

    public static final Logger logger = LoggerFactory.getLogger(DistributedLock.class);
    /**
     * 加锁默认超时时间
     */
    private static final long DEFAULT_TIMEOUT_SECOND = 5;
    /**
     * 获取所等待的时间
     */
    private static final long DEFAULT_WAITE_TIMEOUT_SECOND = 5;
    /**
     * 加锁循环等待时间
     */
    private static final long LOOP_WAIT_TIME_MILLISECOND = 30;
    @Autowired
    @Qualifier("redisCacheService")
    private BaseCacheService cacheService;

    /**
     * 有超时等待的加锁
     *
     * @param timeoutSecond      如果为null,使用默认超时时间
     * @param waiteTimeoutSecond 若果为null,使用默认超时时间
     * @return 加锁的值(超时时间:-1表示获取失败,超时)
     */
    public long lock(String key, Long timeoutSecond, Long waiteTimeoutSecond) {

        logger.info("Thread:" + Thread.currentThread().getName() + " start lock");
        long beganTime = System.currentTimeMillis() / 1000;
        //如果参数错误
        if (timeoutSecond != null && timeoutSecond <= 0) {
            timeoutSecond = DEFAULT_TIMEOUT_SECOND;
        }
        timeoutSecond = timeoutSecond == null ? DEFAULT_TIMEOUT_SECOND : timeoutSecond;
        if (waiteTimeoutSecond != null && waiteTimeoutSecond <= 0) {
            waiteTimeoutSecond = DEFAULT_WAITE_TIMEOUT_SECOND;
        }
        waiteTimeoutSecond =
                waiteTimeoutSecond == null ? DEFAULT_WAITE_TIMEOUT_SECOND : waiteTimeoutSecond;
        while (true) {
            //等待超时判断
            long endTime = System.currentTimeMillis() / 1000;
            if ((endTime - beganTime) >= waiteTimeoutSecond) {
                return -1l;
            }
            //超时时间点
            long timeoutTimeMilli = cacheService.getCurrentTimeMilliForCache() + timeoutSecond * 1000;

            //如果设置成功
            if (cacheService.setIfAbsent(key, timeoutTimeMilli)) {
                logger.info("Thread:" + Thread.currentThread().getName() + " lock success");
                return timeoutTimeMilli;
            }

            //如果已经超时
            Long value = cacheService.getVal(key, Long.class);
            if (value != null && value.longValue() < cacheService.getCurrentTimeMilliForCache()) {

                //设置新的超时时间
                Long oldValue = cacheService.getAndSet(key, timeoutTimeMilli);

                //多个线程同时getset,只有第一个才可以获取到锁
                if (value.equals(oldValue)) {
                    logger.info("Thread:" + Thread.currentThread().getName() + " lock success");
                    return timeoutTimeMilli;
                }
            }

            //延迟一定毫秒,防止请求太频繁
            try {
                Thread.sleep(LOOP_WAIT_TIME_MILLISECOND);
            } catch (InterruptedException e) {
                logger.error("DistributedLock lock sleep error", e);
            }
        }
    }

    /**
     * 无超时等待的加锁
     *
     * @param timeoutSecond 如果为null,使用默认超时时间
     * @return 加锁的值(超时时间)
     */
    public long lock(String key, Long timeoutSecond) {

        logger.info("Thread:" + Thread.currentThread().getName() + " start lock");

        //如果参数错误
        if (timeoutSecond != null && timeoutSecond <= 0) {
            timeoutSecond = DEFAULT_TIMEOUT_SECOND;
        }
        timeoutSecond = timeoutSecond == null ? DEFAULT_TIMEOUT_SECOND : timeoutSecond;
        while (true) {
            //超时时间点
            long timeoutTimeMilli = cacheService.getCurrentTimeMilliForCache() + timeoutSecond * 1000;

            //如果设置成功
            // 若在redis中、没有相应的key值,那么可以认为,当前线程即或得该锁。
            if (cacheService.setIfAbsent(key, timeoutTimeMilli)) {
                logger.info("Thread:" + Thread.currentThread().getName() + " lock success");
                return timeoutTimeMilli;
            }

            //如果已经超时
            // 此时该key在redis已存在,获取该key的值
            Long value = cacheService.getVal(key, Long.class);
            if (value != null && value.longValue() < cacheService.getCurrentTimeMilliForCache()) {

                //设置新的超时时间
                // 如果此时获取到的oldValue,与前面获取的value相同,则说明该线程可以获得锁
                // 获得锁后,set进新值。
                Long oldValue = cacheService.getAndSet(key, timeoutTimeMilli);

                //多个线程同时getset,只有第一个才可以获取到锁
                if (value.equals(oldValue)) {
                    logger.info("Thread:" + Thread.currentThread().getName() + " lock success");
                    return timeoutTimeMilli;
                }
            }

            //延迟一定毫秒,防止请求太频繁
            try {
                Thread.sleep(LOOP_WAIT_TIME_MILLISECOND);
            } catch (InterruptedException e) {
                logger.error("DistributedLock lock sleep error", e);
            }
        }
    }

    /**
     * 释放锁
     */
    public void unLock(String key, long lockValue) {

        logger.info("Thread:" + Thread.currentThread().getName() + " start unlock");
        Long value = cacheService.getVal(key, Long.class);
        if (value != null && value.equals(lockValue)) {//如果是本线程加锁
            cacheService.deleteVal(key);
            logger.info("Thread:" + Thread.currentThread().getName() + " unlock success");
        }
    }
}

Read more

Volcano 与 Kubernetes GPU 调度学习笔记

本笔记系统整理 Volcano 调度器、Kubernetes 调度框架、GPU Device Plugin、HAMi 等云原生 AI 调度领域的核心知识,适合用于学习、复习和工程实践参考。 目录 * 第一部分:Volcano 入门 * 1. Volcano 是什么 * 2. 安装与快速使用 * 3. 核心特性一览 * 第二部分:Volcano 整体架构 * 4. Volcano 解决的核心问题 * 5. 整体架构与数据流 * 6. 三层抽象模型 * 第三部分:Volcano 核心实现原理 * 7. Session 机制 * 8. Gang Scheduling 实现 * 9. Queue 与 DRF 公平调度

容器镜像(4):镜像的常用工具箱

容器镜像(4):镜像的常用工具箱

前几篇在讲多架构镜像时已经用过 skopeo 和 crane 做镜像复制,这篇系统整理这两个工具的完整能力,同时介绍几个日常操作镜像时同样好用的工具。 一、skopeo:不依赖 Daemon 的镜像瑞士军刀 skopeo 的核心价值是绕过 Docker daemon,直接与 Registry API 交互。上一篇用它做镜像复制和离线传输,但它的能力远不止于此。 1.1 安装 # Ubuntu / Debian sudo apt install -y skopeo skopeo --version # skopeo version 1.15.1 1.2 inspect:免拉取检查镜像元数据 docker inspect 需要先把镜像拉到本地,skopeo inspect 直接向 Registry

容器镜像(3):多架构镜像构建

容器镜像(3):多架构镜像构建

一、什么是多架构镜像 1.1 OCI Image Index 上一篇介绍了单平台镜像的结构:一个 Manifest 指向 Config 和若干 Layer blob。多架构镜像在此之上多了一层——OCI Image Index(也叫 Manifest List),是一个轻量的索引文件,把多个单平台 Manifest 组织在一起: $ docker manifest inspect golang:1.22-alpine { "schemaVersion": 2, "mediaType": "application/vnd.oci.image.index.v1+json", "manifests&

容器镜像(2):containerd 视角下的镜像

容器镜像(2):containerd 视角下的镜像

一、为什么需要了解 containerd 如果你只用 docker run 跑容器,从来不关心底层,那可以不了解 containerd。但如果你在用 Kubernetes,或者想真正理解"容器运行时"是什么,containerd 是绕不开的。 事实上,当你执行 docker run 的时候,containerd 早就在后台悄悄工作了——Docker 从 1.11 版本开始,就把核心运行时剥离出来交给 containerd 负责。 1.1 Docker 的架构演变 早期的 Docker(1.10 及之前)是一个"大一统"的单体程序:一个 dockerd