翼度科技»论坛 编程开发 mysql 查看内容

并发情况如何实现加锁来保证数据一致性?

3

主题

3

帖子

9

积分

新手上路

Rank: 1

积分
9
单体架构下锁的实现方案

1. ReentrantLock全局锁

ReentrantLock(可重入锁),指的是一个线程再次对已持有的锁保护的临界资源时,重入请求将会成功。
简单的与我们常用的Synchronized进行比较:
ReentrantLockSynchronized锁实现机制依赖AQS监视器模式灵活性支持响应超时、中断、尝试获取锁不灵活释放形式必须显示调用unlock()释放锁自动释放监视器锁类型公平锁 & 非公平锁非公平锁条件队列可关联多个条件队列关联一个条件队列可重入性可重入可重入AQS机制:如果被请求的共享资源空闲,那么就当前请求资源的线程设置为有效的工作线程,将共享资源通过CAScompareAndSetState设置为锁定状态;如果共享资源被占用,就采用一定的阻塞等待唤醒机制(CLH变体的FIFO双端队列)来保证锁分配。
可重入性:无论是公平锁还是非公平锁的情况,加锁过程会利用一个state值
  1. private volatile int state
复制代码

  • state值初始化的时候为0,表示没有任何线程持有锁
  • 当有线程来请求该锁时,state值会自增1,同一个线程多次获取锁,就会多次+1,这就是可重入的概念
  • 解锁也是对state值自减1,一直到0,此线程对锁释放。
  1. public class LockExample {
  2.     static int count = 0;
  3.     static ReentrantLock lock = new ReentrantLock();
  4.     public static void main(String[] args) throws InterruptedException {
  5.         Runnable runnable = new Runnable() {
  6.             @Override
  7.             public void run() {
  8.                 try {
  9.                     // 加锁
  10.                     lock.lock();
  11.                     for (int i = 0; i < 10000; i++) {
  12.                         count++;
  13.                     }
  14.                 } catch (Exception e) {
  15.                     e.printStackTrace();
  16.                 }
  17.                 finally {
  18.                     // 解锁,放在finally子句中,保证锁的释放
  19.                     lock.unlock();
  20.                 }
  21.             }
  22.         };
  23.         Thread thread1 = new Thread(runnable);
  24.         Thread thread2 = new Thread(runnable);
  25.         thread1.start();
  26.         thread2.start();
  27.         thread1.join();
  28.         thread2.join();
  29.         System.out.println("count: " + count);
  30.     }
  31. }
  32. /**
  33. * 输出
  34. * count: 20000
  35. */
复制代码
2. Mysql行锁、乐观锁

乐观锁即是无锁思想,一般都是基于CAS思想实现的,而在MySQL中通过version版本号 + CAS无锁形式实现乐观锁;例如T1,T2两个事务一起并发执行时,当T2事务执行成功提交后,会对version+1,所以T1事务执行的version条件就无法成立了。
对sql语句进行加锁以及状态机的操作,也可以避免不同线程同时对count值访问导致的数据不一致问题。
  1. // 乐观锁 + 状态机
  2. update
  3.     table_name
  4. set
  5.     version = version + 1,
  6.     count = count + 1
  7. where
  8.     id = id AND version = version AND count = [修改前的count值];
  9. // 行锁 + 状态机
  10. update
  11.     table_name
  12. set
  13.     count = count + 1
  14. where
  15.     id = id AND count = [修改前的count值]
  16. for update;
复制代码
3. 细粒度的ReetrantLock锁

如果我们直接采用ReentrantLock全局加锁,那么这种情况是一条线程获取到锁,整个程序全部的线程来到这里都会阻塞;但是我们在项目里面想要针对每个用户在操作的时候实现互斥逻辑,所以我们需要更加细粒度的锁。
  1. public class LockExample {
  2.     private static Map<String, Lock> lockMap = new ConcurrentHashMap<>();
  3.    
  4.     public static void lock(String userId) {
  5.         // Map中添加细粒度的锁资源
  6.         lockMap.putIfAbsent(userId, new ReentrantLock());
  7.         // 从容器中拿锁并实现加锁
  8.         lockMap.get(userId).lock();
  9.     }
  10.     public static void unlock(String userId) {
  11.         // 先从容器中拿锁,确保锁的存在
  12.         Lock locak = lockMap.get(userId);
  13.         // 释放锁
  14.         lock.unlock();
  15.     }
  16. }
复制代码
弊端:如果每一个用户请求共享资源,就会加锁一次,后续该用户就没有在登录过平台,但是锁对象会一直存在于内存中,这等价于发生了内存泄漏,所以锁的超时和淘汰机制机制需要实现。
4. 细粒度的Synchronized全局锁

上面的加锁机制使用到了锁容器ConcurrentHashMap,该容易为了线程安全的情况,多以底层还是会用到Synchronized机制,所以有些情况,使用lockMap需要加上两层锁。
那么我们是不是可以直接使用Synchronized来实现细粒度的锁机制
  1. public class LockExample {
  2.     public static void syncFunc1(Long accountId) {
  3.         String lock = new String(accountId + "").intern();
  4.         synchronized (lock) {
  5.             System.out.println(Thread.currentThread().getName() + "拿到锁了");
  6.             // 模拟业务耗时
  7.             try {
  8.                 Thread.sleep(1000);
  9.             } catch (InterruptedException e) {
  10.                 throw new RuntimeException(e);
  11.             }
  12.             System.out.println(Thread.currentThread().getName() + "释放锁了");
  13.         }
  14.     }
  15.     public static void syncFunc2(Long accountId) {
  16.         String lock = new String(accountId + "").intern();
  17.         synchronized (lock) {
  18.             System.out.println(Thread.currentThread().getName() + "拿到锁了");
  19.             // 模拟业务耗时
  20.             try {
  21.                 Thread.sleep(1000);
  22.             } catch (InterruptedException e) {
  23.                 throw new RuntimeException(e);
  24.             }
  25.             System.out.println(Thread.currentThread().getName() + "释放锁了");
  26.         }
  27.     }
  28.     // 使用 Synchronized 来实现更加细粒度的锁
  29.     public static void main(String[] args) {
  30.         new Thread(()-> syncFunc1(123456L), "Thread-1").start();
  31.         new Thread(()-> syncFunc2(123456L), "Thread-2").start();
  32.     }
  33. }
  34. /**
  35. * 打印
  36. * Thread-1拿到锁了
  37. * Thread-1释放锁了
  38. * Thread-2拿到锁了
  39. * Thread-2释放锁了
  40. */
复制代码

  • 从代码中我们发现实现加锁的对象其实就是一个与用户ID相关的一个字符串对象,这里可能会有疑问,我每一个新的线程进来,new的都是一个新的字符串对象,只不过字符串内容一样,怎么能够保证可以安全的锁住共享资源呢;
  • 这其实需要归功于后面的intern()函数的功能;
  • intern()函数用于在运行时将字符串添加到堆空间中的字符串常量池中,如果字符串已经存在,返回字符串常量池中的引用。
分布式架构下锁的实现方案

核心问题:我们需要找到一个多个进程之间所有线程可见的区域来定义这个互斥量。
一个优秀的分布式锁的实现方案应该满足如下几个特性:

  • 分布式环境下,可以保证不同进程之间的线程互斥
  • 同一时刻,同时只允许一条线程成功获取到锁资源
  • 保证互斥量的地方需要保证高可用性
  • 要保证可以高性能的获取锁和释放锁
  • 可以支持同一线程的锁重入性
  • 具备合理的阻塞机制,竞争锁失败的线程要有相应的处理方案
  • 支持非阻塞式的获取锁。获取锁失败的线程可以直接返回
  • 具备合理的锁失效机制,如超时失效等,可以确保避免死锁情况出现
Redis实现分布式锁


  • redis属于中间件,可独立部署;
  • 对于不同的Java进程来说都是可见的,同时性能也非常可观
  • 依赖与redis本身提供的指令setnx key value来实现分布式锁;区别于普通set指令的是只有当key不存在时才会设置成功,key存在时会返回设置失败
代码实例:
  1. // 扣库存接口
  2. @RequestMapping("/minusInventory")
  3. public String minusInventory(Inventory inventory) {
  4.     // 获取锁
  5.     String lockKey = "lock-" + inventory.getInventoryId();
  6.     int timeOut = 100;
  7.     Boolean flag = stringRedisTemplate.opsForValue()
  8.             .setIfAbsent(lockKey, "竹子-熊猫",timeOut,TimeUnit.SECONDS);
  9.     // 加上过期时间,可以保证死锁也会在一定时间内释放锁
  10.     stringRedisTemplate.expire(lockKey,timeOut,TimeUnit.SECONDS);
  11.    
  12.     if(!flag){
  13.         // 非阻塞式实现
  14.         return "服务器繁忙...请稍后重试!!!";
  15.     }
  16.    
  17.     // ----只有获取锁成功才能执行下述的减库存业务----        
  18.     try{
  19.         // 查询库存信息
  20.         Inventory inventoryResult =
  21.             inventoryService.selectByPrimaryKey(inventory.getInventoryId());
  22.         
  23.         if (inventoryResult.getShopCount() <= 0) {
  24.             return "库存不足,请联系卖家....";
  25.         }
  26.         
  27.         // 扣减库存
  28.         inventoryResult.setShopCount(inventoryResult.getShopCount() - 1);
  29.         int n = inventoryService.updateByPrimaryKeySelective(inventoryResult);
  30.     } catch (Exception e) { // 确保业务出现异常也可以释放锁,避免死锁
  31.         // 释放锁
  32.         stringRedisTemplate.delete(lockKey);
  33.     }
  34.    
  35.     if (n > 0)
  36.         return "端口-" + port + ",库存扣减成功!!!";
  37.     return "端口-" + port + ",库存扣减失败!!!";
  38. }
  39. 作者:竹子爱熊猫
  40. 链接:https://juejin.cn/post/7038473714970656775
复制代码
过期时间的合理性分析:
因为对于不同的业务,我们设置的过期时间的长短都会不一样,太长了不合适,太短了也不合适;
所以我们想到的解决方案是设置一条子线程,给当前锁资源续命。具体实现是,子线程间隔2-3s去查询一次key是否过期,如果还没有过期则代表业务线程还在执行业务,那么则为该key的过期时间加上5s。
但是为了避免主线程意外死亡后,子线程会一直为其续命,造成“长生锁”的现象,所以将子线程变为主(业务)线程的守护线程,这样子线程就会跟着主线程一起死亡。
[code]// 续命子线程public class GuardThread extends Thread {     private static boolean flag = true;    public GuardThread(String lockKey,         int timeOut, StringRedisTemplate stringRedisTemplate){        ……    }    @Override    public void run() {        // 开启循环续命        while (flag){            try {                // 先休眠一半的时间                Thread.sleep(timeOut / 2 * 1000);            }catch (Exception e){                e.printStackTrace();            }            // 时间过了一半之后再去续命            // 先查看key是否过期            Long expire = stringRedisTemplate.getExpire(                lockKey, TimeUnit.SECONDS);            // 如果过期了,代表主线程释放了锁            if (expire

举报 回复 使用道具