0
点赞
收藏
分享

微信扫一扫

redis实现分布式锁(完善版)

针对之前文章实现的redis分布式锁的方式,有几个问题,需要改进

谨记:Redis运行模式是单线程的

针对上面提出的几个问题,在改造一版逻辑更加完善,更加安全的Redis实现分布式锁

具体代码实现一下:

首先我们添加主要的pom依赖文件

        <!--redis客户端jedis的依赖,我们这里使用的是3.0以上的版本-->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.0.1</version>
        </dependency>

        <!-- 单元测试的依赖 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

接下来我们编写spring配置类

package com.chuxin.example.redis.lock.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.*;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
 * @FileName: SpringConfig
 * @Description: 配置类
 * @author: <a href="mailto: ****@163.com">myp</a>
 * @create: 2019-08-02 14:40
 */
@Configuration
@ComponentScan("com.chuxin.example.redis.lock")
public class SpringConfig {

    @Scope("prototype")
    @Lazy
    @Bean
    public Jedis jedis(JedisPool jedisPool){
        return jedisPool.getResource();
    }

    //配置一个连接池
    @Bean
    public JedisPool jedisPool(){
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(10);
        jedisPoolConfig.setMaxIdle(10);
        jedisPoolConfig.setMinIdle(1);
        jedisPoolConfig.setMaxWaitMillis(2000);
        jedisPoolConfig.setTestOnBorrow(true);
//        jedisPoolConfig.setTestOnReturn(false);
        jedisPoolConfig.setTestOnReturn(true);//生产环境必须使用true
        JedisPool jedisPool = new JedisPool(jedisPoolConfig, "127.0.0.1", 6379);
        return jedisPool;
    }

}

编写一个创建线程的单例模式的工具类,这个作为守护线程使用

package com.chuxin.example.redis.lock.config;

/**
 * @FileName: ThreadUtil
 * @Description: 线程工具类
 * @author: <a href="mailto: ***@163.com">myp</a>
 * @create: 2019-08-02 14:49
 */
public class ThreadUtil {

    public static Thread thread;

    public static Thread newThread(Runnable runnable){
        if (thread == null) {
            synchronized (ThreadUtil.class) {
                if (thread == null) {
                    thread = new Thread(runnable);
                    thread.setDaemon(true);
                }
                return thread;
            }
        }
        return thread;
    }

}

接下来就是重点实现我们加锁和释放锁的关键代码了
重新定义lock接口

package com.chuxin.example.redis.lock.test;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;

/**
 * @FileName: Lock
 * @Description:
 * @author: <a href="mailto: muyuanpei@163.com">myp</a>
 * @create: 2019-08-02 15:13
 */
public interface Lock {

    //加锁
    void lock();

    void lockInterruptibly() throws InterruptedException;

    //尝试加锁
    boolean tryLock();

    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    //释放锁
    void unLock() throws Exception;

    Condition newCondition();
}

具体的锁功能实现

package com.chuxin.example.redis.lock.test;

import com.chuxin.example.redis.lock.config.ThreadUtil;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;

import javax.annotation.Resource;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;

/**
 * @FileName: RedisLock
 * @Description: Redis锁
 * @author: <a href="mailto: muyuanpei@163.com">myp</a>
 * @create: 2019-08-02 14:00
 */
@Component
@Service("lock")
public class RedisLock implements Lock {

    @Resource
    private JedisPool jedisPool;

    private static final String key = "lock";

    private ThreadLocal<String> threadLocal = new ThreadLocal();

    private static AtomicBoolean isHappened = new AtomicBoolean(true);

    //加锁方法
    @Override
    public void lock() {
        boolean b = tryLock();
        if (b) {
            return ;
        } try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        lock();
    }


    //尝试加锁方法
    @Override
    public boolean tryLock() {
        //jedis3.0之后才提供的这个对象
        SetParams setParams = new SetParams();
        setParams.ex(1);//设置过期时间为2秒
        setParams.nx();
        String s = UUID.randomUUID().toString();
        Jedis resource = null;
        try {//这里捕获一下异常,如果获取资源时出现异常,那么将没有用到的资源放回资源池中
            resource = jedisPool.getResource();
        } catch (RuntimeException e) {
            if (resource != null) {
                resource.close();
            } else {
                System.out.println("资源池中资源不够用啦,在这里处理。。。阻塞等待或者丢弃请求");
                return false;
            }
        }

        //加锁,同时设置value值、过期时间。如果返回ok就说明之前这个key是不存在的,现在已经添加上了
        String lock = resource.set(key,s,setParams);

        //jedis3.0之前使用下面这行代码
//        String lock = resource.set(key,s,"NX","PX",5000);
        
        resource.close();
        if ("OK".equals(lock)) {
            //获取到了锁
            threadLocal.set(s);
            if (isHappened.get()){
                ThreadUtil.newThread(new MyRunnable(jedisPool)).start();
                isHappened.set(false);
            }
            return true;
        }
        return false;
    }

    @Override
    public void unLock() throws Exception {

        //解析:注释的这段代码和下面的代码都是释放锁用的。上面的由原子性问题。下面的是使用lua表达式解决了lua表达式的。
//        Jedis resource = jedisPool.getResource();
//        if (resource.get(key).equals(threadLocal.get())) {
//            Long del = resource.del(key);
//            if (del == 0) {
//                resource.close();
//            throw new Exception("解锁失败!");
//            }
//        }
//        resource.close();



        String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +
                "    return redis.call(\"del\",KEYS[1])\n" +
                "else\n" +
                "    return 0\n" +
                "end";
        Jedis resource = jedisPool.getResource();
        Object eval = resource.eval(script, Arrays.asList(key),Arrays.asList(threadLocal.get()));
        if (Integer.valueOf(eval.toString()) == 0) {
            resource.close();
            throw new Exception("解锁失败!");
        } else {
            resource.close();
        }
    }

    public class MyRunnable implements Runnable{
        private JedisPool jedisPool;
        public MyRunnable(JedisPool jedisPool){
            this.jedisPool = jedisPool;
        }
        @Override
        public void run() {
            Jedis jedis = jedisPool.getResource();
            while(true){
                Long ttl = jedis.ttl(key);
                if (ttl != null && ttl > 0) {
                    jedis.expire(key,(int)(ttl+1));//单位s
                }
                try {
                    Thread.sleep(1000);
                } catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }


    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public Condition newCondition() {
        return null;
    }


}

最后我们写一个Demo测试一下我们的代码:

package com.chuxin.example.redis.lock.test;

import com.chuxin.example.redis.lock.config.SpringConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.context.web.WebAppConfiguration;

/**
 * @FileName: Demo
 * @Description: 单元测试demo
 * @author: <a href="mailto: muyuanpei@163.com">myp</a>
 * @create: 2019-08-02 14:56
 */
@ContextConfiguration(classes = SpringConfig.class)
@RunWith(SpringRunner.class)
@WebAppConfiguration
public class Demo {

    private static int count = 1000;

    @Autowired
    @Qualifier("lock")
    private Lock lock;

    @Test
    public void Test() throws InterruptedException{
        TicketRunBle ticketRunBle = new TicketRunBle();
        Thread thread1 = new Thread(ticketRunBle, "窗口1");
        Thread thread2 = new Thread(ticketRunBle, "窗口2");
        Thread thread3 = new Thread(ticketRunBle, "窗口3");
        Thread thread4 = new Thread(ticketRunBle, "窗口4");
        Thread thread5 = new Thread(ticketRunBle, "窗口5");
        thread1.start();
        thread2.start();
        thread3.start();
        thread4.start();
        thread5.start();
        Thread.currentThread().join();
    }

    public class TicketRunBle implements Runnable{

        @Override
        public void run(){
            while (count > 0) {
                //调用加锁方法
                lock.lock();
                try {
                    if (count > 0) {
                        System.out.println(Thread.currentThread().getName()+"售出第"+count--+"张票");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    try {
                        //调用释放锁方法
                        lock.unLock();
                    }catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                try {
                    Thread.sleep(50);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

最终就打印出来了多线程并发情况下,这1000张票出票的记录,不会出现重复出票的现象。。。


2020年07月19日补充更新:
如果redis我们使用主从模式实现的情况下,这样就会出现一个问题:如果redis主节点突然宕机怎么办?有的小伙伴会说,在主节点宕机之后,使用从节点就可以了,但是我们知道redis的主从复制是异步执行的,现在如果A获取锁之后,执行正常的业务处理,这时主节点中A获取的key还未同步到从节点上,主节点突然宕机,这时使用从节点,但是从节点中并没有A获取的key信息,这样B节点此时来获取锁,那么就会出现A和B同时获取到锁。如何解决这个问题呢???

答案:第一种方案:我们上面的实现方式其实是可以解决这个问题的,因为我们释放锁的时候是存在一次校验的,如果校验失败,我们可以做业务数据回滚,也就是如果上面问题发生时,A的业务就是执行不成功的,实际上执行不成功的原因就是由于主节点宕机导致的,这样可以解决问题,同时也能保证数据正确性;

举报

相关推荐

0 条评论