翼度科技»论坛 云主机 服务器技术 查看内容

rocketmq-streams的ILeaseService使用示例详解

9

主题

9

帖子

27

积分

新手上路

Rank: 1

积分
27


本文主要研究一下rocketmq-streams的ILeaseService

ILeaseService
  1. /**
  2. * 通过db实现租约和锁,可以更轻量级,减少其他中间件的依赖 使用主备场景,只有一个实例运行,当当前实例挂掉,在一定时间内,会被其他实例接手 也可以用于全局锁
  3. */
  4. public interface ILeaseService {
  5.     /**
  6.      * 默认锁定时间
  7.      */
  8.     static final int DEFALUT_LOCK_TIME = 60 * 5;
  9.     /**
  10.      * 检查某用户当前时间是否具有租约。这个方法是纯内存操作,无性能开销
  11.      *
  12.      * @return true,租约有效;false,租约无效
  13.      */
  14.     boolean hasLease(String name);
  15.     /**
  16.      * 申请租约,会启动一个线程,不停申请租约,直到申请成功。 申请成功后,每 租期/2 续约。 如果目前被其他租户获取租约,只有在对方租约失效,后才允许新的租户获取租约
  17.      *
  18.      * @param name 租约名称,无特殊要求,相同名称会竞争租约
  19.      */
  20.     void startLeaseTask(String name);
  21.     /**
  22.      * 申请租约,会启动一个线程,不停申请租约,直到申请成功。 申请成功后,每 租期/2 续约。 如果目前被其他租户获取租约,只有在对方租约失效,后才允许新的租户获取租约
  23.      *
  24.      * @param name     租约名称,无特殊要求,相同名称会竞争租约
  25.      * @param callback 当第一获取租约时,回调此函数
  26.      */
  27.     void startLeaseTask(final String name, ILeaseGetCallback callback);
  28.     /**
  29.      * 申请租约,会启动一个线程,不停申请租约,直到申请成功。 申请成功后,每 租期/2 续约。 如果目前被其他租户获取租约,只有在对方租约失效,后才允许新的租户获取租约
  30.      *
  31.      * @param name            租约名称,无特殊要求,相同名称会竞争租约
  32.      * @param leaseTermSecond 租期,在租期内可以做业务处理,单位是秒
  33.      * @param callback        当第一获取租约时,回调此函数
  34.      */
  35.     void startLeaseTask(final String name, int leaseTermSecond, ILeaseGetCallback callback);
  36.     /**
  37.      * 申请锁,无论成功与否,立刻返回。如果不释放,最大锁定时间是5分钟
  38.      *
  39.      * @param name       业务名称
  40.      * @param lockerName 锁名称
  41.      * @return 是否枷锁成功
  42.      */
  43.     boolean lock(String name, String lockerName);
  44.     /**
  45.      * 申请锁,无论成功与否,立刻返回。默认锁定时间是5分钟
  46.      *
  47.      * @param name           业务名称
  48.      * @param lockerName     锁名称
  49.      * @param lockTimeSecond 如果不释放,锁定的最大时间,单位是秒
  50.      * @return 是否枷锁成功
  51.      * @return
  52.      */
  53.     boolean lock(String name, String lockerName, int lockTimeSecond);
  54.     /**
  55.      * 申请锁,如果没有则等待,等待时间可以指定,如果是-1 则无限等待。如果不释放,最大锁定时间是5分钟
  56.      *
  57.      * @param name       业务名称
  58.      * @param lockerName 锁名称
  59.      * @param waitTime   没获取锁时,最大等待多长时间,如果是-1 则无限等待
  60.      * @return 是否枷锁成功
  61.      */
  62.     boolean tryLocker(String name, String lockerName, long waitTime);
  63.     /**
  64.      * 申请锁,如果没有则等待,等待时间可以指定,如果是-1 则无限等待。如果不释放,最大锁定时间是lockTimeSecond
  65.      *
  66.      * @param name           业务名称
  67.      * @param lockerName     锁名称
  68.      * @param waitTime       没获取锁时,最大等待多长时间,如果是-1 则无限等待
  69.      * @param lockTimeSecond 如果不释放,锁定的最大时间,单位是秒
  70.      * @return 是否枷锁成功
  71.      */
  72.     boolean tryLocker(String name, String lockerName, long waitTime, int lockTimeSecond);
  73.     /**
  74.      * 释放锁
  75.      *
  76.      * @param name
  77.      * @param lockerName
  78.      * @return
  79.      */
  80.     boolean unlock(String name, String lockerName);
  81.     /**
  82.      * 对于已经获取锁的,可以通过这个方法,一直持有锁。 和租约的区别是,当释放锁后,无其他实例抢占。无法实现主备模式
  83.      *
  84.      * @param name           业务名称
  85.      * @param lockerName     锁名称
  86.      * @param lockTimeSecond 租期,这个方法会自动续约,如果不主动释放,会一直持有锁
  87.      * @return 是否成功获取锁
  88.      */
  89.     boolean holdLock(String name, String lockerName, int lockTimeSecond);
  90.     /**
  91.      * 是否持有锁,不会申请锁。如果以前申请过,且未过期,返回true,否则返回false
  92.      *
  93.      * @param name       业务名称
  94.      * @param lockerName 锁名称
  95.      * @return
  96.      */
  97.     boolean hasHoldLock(String name, String lockerName);
  98.     List<LeaseInfo> queryLockedInstanceByNamePrefix(String name, String lockerNamePrefix);
  99. }
复制代码

  • ILeaseService接口定义了hasLease、startLeaseTask、lock、tryLocker、unlock、holdLock、hasHoldLock、queryLockedInstanceByNamePrefix方法

BasedLesaseImpl
  1. public abstract class BasedLesaseImpl implements ILeaseService {
  2.     private static final Log LOG = LogFactory.getLog(BasedLesaseImpl.class);
  3.     private static final String CONSISTENT_HASH_PREFIX = "consistent_hash_";
  4.     private static final AtomicBoolean syncStart = new AtomicBoolean(false);
  5.     private static final int synTime = 120;  // 5分钟的一致性hash同步时间太久了,改为2分钟
  6.     protected ScheduledExecutorService taskExecutor = null;
  7.     protected int leaseTerm = 300 * 2;                                  // 租约时间
  8.     // protected transient JDBCDriver jdbcDataSource = null;
  9.     protected ILeaseStorage leaseStorage;
  10.     protected volatile Map<String, Date> leaseName2Date = new ConcurrentHashMap<>();    // 每个lease name对应的租约到期时间
  11.     public BasedLesaseImpl() {
  12.         taskExecutor = new ScheduledThreadPoolExecutor(10);
  13.     }
  14.     /**
  15.      * lease_name: consistent_hash_ip, lease_user_ip: ip,定时刷新lease_info表,检查一致性hash环的节点情况
  16.      *
  17.      * @param name
  18.      * @return
  19.      */
  20.     @Override
  21.     public boolean hasLease(String name) {
  22.         // 内存中没有租约信息则表示 没有租约
  23.         Date leaseEndTime = leaseName2Date.get(name);
  24.         if (leaseEndTime == null) {
  25.             // LOG.info("内存中根据 " + name + "没有查询到租约信息,表示没有租约");
  26.             return false;
  27.         }
  28.         // LOG.info("查询是否有租约 name:" + name + " ,当前时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
  29.         // + " 租约到期时间 " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(leaseEndTime));
  30.         // 有租约时间,并且租约时间大于当前时间,表示有租约信息
  31.         if (new Date().before(leaseEndTime)) {
  32.             return true;
  33.         }
  34.         return false;
  35.     }
  36.     private final Map<String, AtomicBoolean> startLeaseMap = new HashMap<>();
  37.     @Override
  38.     public void startLeaseTask(final String name) {
  39.         startLeaseTask(name, this.leaseTerm, null);
  40.     }
  41.     @Override
  42.     public void startLeaseTask(final String name, ILeaseGetCallback callback) {
  43.         startLeaseTask(name, this.leaseTerm, callback);
  44.     }
  45.     @Override
  46.     public void startLeaseTask(final String name, int leaseTerm, ILeaseGetCallback callback) {
  47.         ApplyTask applyTask = new ApplyTask(leaseTerm, name, callback);
  48.         startLeaseTask(name, applyTask, leaseTerm / 2, true);
  49.     }
  50.     /**
  51.      * 启动定时器,定时执行任务,确保任务可重入
  52.      *
  53.      * @param name
  54.      * @param runnable     具体任务
  55.      * @param scheduleTime 调度时间
  56.      * @param startNow     是否立刻启动一次
  57.      */
  58.     protected void startLeaseTask(final String name, Runnable runnable, int scheduleTime, boolean startNow) {
  59.         AtomicBoolean isStartLease = startLeaseMap.get(name);//多次调用,只启动一次定时任务
  60.         if (isStartLease == null) {
  61.             synchronized (this) {
  62.                 isStartLease = startLeaseMap.get(name);
  63.                 if (isStartLease == null) {
  64.                     isStartLease = new AtomicBoolean(false);
  65.                     startLeaseMap.put(name, isStartLease);
  66.                 }
  67.             }
  68.         }
  69.         if (isStartLease.compareAndSet(false, true)) {
  70.             if (startNow) {
  71.                 runnable.run();
  72.             }
  73.             taskExecutor.scheduleWithFixedDelay(runnable, 0, scheduleTime, TimeUnit.SECONDS);
  74.         }
  75.     }
  76.     //......
  77. }
复制代码

  • BasedLesaseImpl声明实现了ILeaseService,它依赖ILeaseStorage,startLeaseTask方法会创建ApplyTask,然后以固定间隔调度执行

ApplyTask
  1. /**
  2.      * 续约任务
  3.      */
  4.     protected class ApplyTask implements Runnable {
  5.         protected String name;
  6.         protected int leaseTerm;
  7.         protected ILeaseGetCallback callback;
  8.         public ApplyTask(int leaseTerm, String name) {
  9.             this(leaseTerm, name, null);
  10.         }
  11.         public ApplyTask(int leaseTerm, String name, ILeaseGetCallback callback) {
  12.             this.name = name;
  13.             this.leaseTerm = leaseTerm;
  14.             this.callback = callback;
  15.         }
  16.         @Override
  17.         public void run() {
  18.             try {
  19.                 // LOG.info("LeaseServiceImpl name: " + name + "开始获取租约...");
  20.                 AtomicBoolean newApplyLease = new AtomicBoolean(false);
  21.                 Date leaseDate = applyLeaseTask(leaseTerm, name, newApplyLease);
  22.                 if (leaseDate != null) {
  23.                     leaseName2Date.put(name, leaseDate);
  24.                     LOG.info("LeaseServiceImpl, name: " + name + " " + getSelfUser() + " 获取租约成功, 租约到期时间为 "
  25.                         + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(leaseDate));
  26.                 } else {
  27.                     // fix.2020.08.13 这时name对应的租约可能还在有效期内,或者本机还持有租约,需要remove
  28.                     //  leaseName2Date.remove(name);
  29.                     LOG.info("LeaseServiceImpl name: " + name + " " + getSelfUser() + " 获取租约失败 ");
  30.                 }
  31.                 if (newApplyLease.get() &amp;&amp; callback != null) {
  32.                     callback.callback(leaseDate);
  33.                 }
  34.             } catch (Exception e) {
  35.                 LOG.error(" LeaseServiceImpl name: " + name + "  " + getSelfUser() + " 获取租约出现异常 ", e);
  36.             }
  37.         }
  38.     }
  39.     /**
  40.      * 申请租约,如果当期租约有效,直接更新一个租约周期,如果当前租约无效,先查询是否有有效的租约,如果有申请失败,否则直接申请租约
  41.      */
  42.     protected Date applyLeaseTask(int leaseTerm, String name, AtomicBoolean newApplyLease) {
  43.         // 计算下一次租约时间 = 当前时间 + 租约时长
  44.         Date nextLeaseDate = DateUtil.addSecond(new Date(), leaseTerm);
  45.         // 1 如果已经有租约,则更新租约时间(内存和数据库)即可
  46.         if (hasLease(name)) {
  47.             // LOG.info("用户已有租约,更新数据库和内存中的租约信息");
  48.             // 更新数据库
  49.             LeaseInfo leaseInfo = queryValidateLease(name);
  50.             if (leaseInfo == null) {
  51.                 LOG.error("LeaseServiceImpl applyLeaseTask leaseInfo is null");
  52.                 return null;
  53.             }
  54.             // fix.2020.08.13,与本机ip相等且满足一致性hash分配策略,才续约,其他情况为null
  55.             String leaseUserIp = leaseInfo.getLeaseUserIp();
  56.             if (!leaseUserIp.equals(getSelfUser())) {
  57.                 return null;
  58.             }
  59.             leaseInfo.setLeaseEndDate(nextLeaseDate);
  60.             updateLeaseInfo(leaseInfo);
  61.             return nextLeaseDate;
  62.         }
  63.         // 2 没有租约情况 判断是否可以获取租约,只要租约没有被其他人获取,则说明有有效租约
  64.         boolean success = canGetLease(name);
  65.         if (!success) { // 表示被其他机器获取到了有效的租约
  66.             // LOG.info("其他机器获取到了有效的租约");
  67.             return null;
  68.         }
  69.         // 3 没有租约而且可以获取租约的情况,则尝试使用数据库原子更新的方式获取租约,保证只有一台机器成功获取租约,而且可以运行
  70.         boolean flag = tryGetLease(name, nextLeaseDate);
  71.         if (flag) { // 获取租约成功
  72.             newApplyLease.set(true);
  73.             return nextLeaseDate;
  74.         }
  75.         return null;
  76.     }
复制代码

  • ApplyTask内部调用applyLeaseTask,如果已有租约则更新租约时间,没有租约则判断是否可以获取租约,可以则执行tryGetLease

tryGetLease
  1. /**
  2.      * 更新数据库,占用租期并更新租期时间
  3.      *
  4.      * @param time
  5.      */
  6.     protected boolean tryGetLease(String name, Date time) {
  7.         // LOG.info("尝试获取租约 lease name is : " + name + " 下次到期时间: "
  8.         // + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time));
  9.         LeaseInfo validateLeaseInfo = queryValidateLease(name);
  10.         if (validateLeaseInfo == null) {// 这里有两种情况 1 数据库里面没有租约信息 2 数据库里面有租约信息但是已经过期
  11.             Integer count = countLeaseInfo(name);
  12.             if (count == null || count == 0) {// 表示现在数据库里面没有任何租约信息,插入租约成功则表示获取成功,失败表示在这一时刻其他机器获取了租约
  13.                 // LOG.info("数据库中暂时没有租约信息,尝试原子插入租约:" + name);
  14.                 // fix.2020.08.13,经过一致性hash计算,该名字的任务不应该在本机执行,直接返回,无需插入。只有分配到hash执行权限的机器才可以插入并获取租约
  15.                 if (!getSelfUser().equals(getConsistentHashHost(name))) {
  16.                     return false;
  17.                 }
  18.                 validateLeaseInfo = new LeaseInfo();
  19.                 validateLeaseInfo.setLeaseName(name);
  20.                 validateLeaseInfo.setLeaseUserIp(getSelfUser());
  21.                 validateLeaseInfo.setLeaseEndDate(time);
  22.                 validateLeaseInfo.setStatus(1);
  23.                 validateLeaseInfo.setVersion(1);
  24.                 if (insert(validateLeaseInfo)) {
  25.                     LOG.info("数据库中暂时没有租约信息,原子插入成功,获取租约成功:" + name);
  26.                     return true;
  27.                 } else {
  28.                     LOG.info("数据库中暂时没有租约信息,原子插入失败,已经被其他机器获取租约:" + name);
  29.                     return false;
  30.                 }
  31.             } else { // 表示数据库里面有一条但是无效,这里需要两台机器按照version进行原子更新,更新成功的获取租约
  32.                 // LOG.info("数据库中有一条无效的租约信息,尝试根据版本号去原子更新租约信息:" + name);
  33.                 LeaseInfo inValidateLeaseInfo = queryInValidateLease(name);
  34.                 if (inValidateLeaseInfo == null) {// 说明这个时候另外一台机器获取成功了
  35.                     LOG.info("另外一台机器获取成功了租约:" + name);
  36.                     return false;
  37.                 }
  38.                 // fix.2020.08.13,机器重启之后,该名字的任务已经不分配在此机器上执行,直接返回,无需更新数据库
  39.                 if (!getSelfUser().equals(getConsistentHashHost(name))) {
  40.                     return false;
  41.                 }
  42.                 inValidateLeaseInfo.setLeaseName(name);
  43.                 inValidateLeaseInfo.setLeaseUserIp(getSelfUser());
  44.                 inValidateLeaseInfo.setLeaseEndDate(time);
  45.                 inValidateLeaseInfo.setStatus(1);
  46.                 boolean success = updateDBLeaseInfo(inValidateLeaseInfo);
  47.                 if (success) {
  48.                     LOG.info("LeaseServiceImpl 原子更新租约成功,当前机器获取到了租约信息:" + name);
  49.                 } else {
  50.                     LOG.info("LeaseServiceImpl 原子更新租约失败,租约被其他机器获取:" + name);
  51.                 }
  52.                 return success;
  53.             }
  54.         } else { // 判断是否是自己获取了租约,如果是自己获取了租约则更新时间(内存和数据库),
  55.             // 这里是为了解决机器重启的情况,机器重启,内存中没有租约信息,但是实际上该用户是有租约权限的
  56.             // fix.2020.08.13,租约的ip与本机ip相等,且满足一致性hash策略,才会被本机执行
  57.             String leaseUserIp = validateLeaseInfo.getLeaseUserIp();
  58.             if (leaseUserIp.equals(getSelfUser())) {
  59.                 // 如果当期用户有租约信息,则更新数据库
  60.                 validateLeaseInfo.setLeaseEndDate(time);
  61.                 boolean hasUpdate = updateLeaseInfo(validateLeaseInfo);
  62.                 if (hasUpdate) {
  63.                     LOG.info(
  64.                         "LeaseServiceImpl机器重启情况,当前用户有租约信息,并且更新数据库成功,租约信息为 name :" + validateLeaseInfo.getLeaseName()
  65.                             + " ip : " + validateLeaseInfo.getLeaseUserIp() + " 到期时间 : " + new SimpleDateFormat(
  66.                             "yyyy-MM-dd HH:mm:ss").format(validateLeaseInfo.getLeaseEndDate()));
  67.                     return true;
  68.                 } else {
  69.                     LOG.info("LeaseServiceImpl 机器重启情况,当前用户有租约信息,并且更新数据库失败,表示失去租约:" + name);
  70.                     return false;
  71.                 }
  72.             }
  73.             // LOG.info("LeaseServiceImpl 租约被其他机器获取,租约信息为 name :" + validateLeaseInfo.getLeaseName() + " ip : "
  74.             // + validateLeaseInfo.getLeaseUserIp() + " 到期时间 : "
  75.             // + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(validateLeaseInfo.getLeaseEndDate()));
  76.             return false;
  77.         }
  78.     }
  79.     protected LeaseInfo queryValidateLease(String name) {
  80.         //String sql = "SELECT * FROM lease_info WHERE lease_name ='" + name + "' and status=1 and lease_end_time>now()";
  81.         //// LOG.info("LeaseServiceImpl query validate lease sql:" + sql);
  82.         //return queryLease(name, sql);
  83.         return leaseStorage.queryValidateLease(name);
  84.     }
复制代码

  • tryGetLease先通过queryValidateLease查询租约信息,若没有租约则插入,若过期则根据版本号更新,若已有租约则判断是否是自己获取了租约,是则更新租约信息

LeaseServiceImpl
  1. public class LeaseServiceImpl extends BasedLesaseImpl {
  2.     private static final Log LOG = LogFactory.getLog(LeaseServiceImpl.class);
  3.     private transient ConcurrentHashMap<String, HoldLockTask> holdLockTasks = new ConcurrentHashMap();
  4.     protected ConcurrentHashMap<String, HoldLockFunture> seizeLockingFuntures = new ConcurrentHashMap<>();
  5.     //如果是抢占锁状态中,则不允许申请锁
  6.     public LeaseServiceImpl() {
  7.         super();
  8.     }
  9.     /**
  10.      * 尝试获取锁,可以等待waitTime,如果到点未返回,则直接返回。如果是-1,则一直等待
  11.      *
  12.      * @param name       业务名称
  13.      * @param lockerName 锁名称
  14.      * @param waitTime   等待时间,是微秒单位
  15.      * @return
  16.      */
  17.     @Override
  18.     public boolean tryLocker(String name, String lockerName, long waitTime) {
  19.         return tryLocker(name, lockerName, waitTime, ILeaseService.DEFALUT_LOCK_TIME);
  20.     }
  21.     @Override
  22.     public boolean tryLocker(String name, String lockerName, long waitTime, int lockTimeSecond) {
  23.         long now = System.currentTimeMillis();
  24.         boolean success = lock(name, lockerName, lockTimeSecond);
  25.         while (!success) {
  26.             if (waitTime > -1 && (System.currentTimeMillis() - now > waitTime)) {
  27.                 break;
  28.             }
  29.             success = lock(name, lockerName, lockTimeSecond);
  30.             if (success) {
  31.                 return success;
  32.             }
  33.             try {
  34.                 Thread.sleep(100);
  35.             } catch (InterruptedException e) {
  36.                 LOG.error("LeaseServiceImpl try locker error", e);
  37.             }
  38.         }
  39.         return success;
  40.     }
  41.     @Override
  42.     public boolean lock(String name, String lockerName) {
  43.         return lock(name, lockerName, ILeaseService.DEFALUT_LOCK_TIME);
  44.     }
  45.     @Override
  46.     public boolean lock(String name, String lockerName, int leaseSecond) {
  47.         lockerName = createLockName(name, lockerName);
  48.         Future future = seizeLockingFuntures.get(lockerName);
  49.         if (future != null && ((HoldLockFunture)future).isDone == false) {
  50.             return false;
  51.         }
  52.         Date nextLeaseDate =
  53.             DateUtil.addSecond(new Date(), leaseSecond);// 默认锁定5分钟,用完需要立刻释放.如果时间不同步,可能导致锁失败
  54.         return tryGetLease(lockerName, nextLeaseDate);
  55.     }
  56.     @Override
  57.     public boolean unlock(String name, String lockerName) {
  58.         // LOG.info("LeaseServiceImpl unlock,name:" + name);
  59.         lockerName = createLockName(name, lockerName);
  60.         LeaseInfo validateLeaseInfo = queryValidateLease(lockerName);
  61.         if (validateLeaseInfo == null) {
  62.             LOG.warn("LeaseServiceImpl unlock,validateLeaseInfo is null,lockerName:" + lockerName);
  63.         }
  64.         if (validateLeaseInfo != null && validateLeaseInfo.getLeaseUserIp().equals(getSelfUser())) {
  65.             validateLeaseInfo.setStatus(0);
  66.             updateDBLeaseInfo(validateLeaseInfo);
  67.         }
  68.         HoldLockTask holdLockTask = holdLockTasks.remove(lockerName);
  69.         if (holdLockTask != null) {
  70.             holdLockTask.close();
  71.         }
  72.         leaseName2Date.remove(lockerName);
  73.         return false;
  74.     }
  75.     /**
  76.      * 如果有锁,则一直持有,如果不能获取,则结束。和租约不同,租约是没有也会尝试重试,一备对方挂机,自己可以接手工作
  77.      *
  78.      * @param name
  79.      * @param secondeName
  80.      * @param lockTimeSecond 获取锁的时间
  81.      * @return
  82.      */
  83.     @Override
  84.     public boolean holdLock(String name, String secondeName, int lockTimeSecond) {
  85.         if (hasHoldLock(name, secondeName)) {
  86.             return true;
  87.         }
  88.         synchronized (this) {
  89.             if (hasHoldLock(name, secondeName)) {
  90.                 return true;
  91.             }
  92.             String lockerName = createLockName(name, secondeName);
  93.             Date nextLeaseDate =
  94.                 DateUtil.addSecond(new Date(), lockTimeSecond);
  95.             boolean success = tryGetLease(lockerName, nextLeaseDate);// 申请锁,锁的时间是leaseTerm
  96.             if (!success) {
  97.                 return false;
  98.             }
  99.             leaseName2Date.put(lockerName, nextLeaseDate);
  100.             if (!holdLockTasks.containsKey(lockerName)) {
  101.                 HoldLockTask holdLockTask = new HoldLockTask(lockTimeSecond, lockerName, this);
  102.                 holdLockTask.start();
  103.                 holdLockTasks.putIfAbsent(lockerName, holdLockTask);
  104.             }
  105.         }
  106.         return true;
  107.     }
  108.     /**
  109.      * 是否持有锁,不访问数据库,直接看本地
  110.      *
  111.      * @param name
  112.      * @param secondeName
  113.      * @return
  114.      */
  115.     @Override
  116.     public boolean hasHoldLock(String name, String secondeName) {
  117.         String lockerName = createLockName(name, secondeName);
  118.         return hasLease(lockerName);
  119.     }
  120.     @Override
  121.     public List<LeaseInfo> queryLockedInstanceByNamePrefix(String name, String lockerNamePrefix) {
  122.         String leaseNamePrefix = MapKeyUtil.createKey(name, lockerNamePrefix);
  123.         return queryValidateLeaseByNamePrefix(leaseNamePrefix);
  124.     }
  125.     //......
  126. }
复制代码

  • LeaseServiceImpl继承了BasedLesaseImpl,tryLocker方法会根据等待时间循环执行lock,lock方法则执行tryGetLease,unlock方法则更新租约信息,同时移除内存记录;holdLock则通过hasHoldLock判断是否持有锁,若有则返回,没有则执行tryGetLease

ILeaseStorage
  1. public interface ILeaseStorage {
  2.     /**
  3.      * 更新lease info,需要是原子操作,存储保障多线程操作的原子性
  4.      *
  5.      * @param leaseInfo 租约表数据
  6.      * @return
  7.      */
  8.     boolean updateLeaseInfo(LeaseInfo leaseInfo);
  9.     /**
  10.      * 统计这个租约名称下,LeaseInfo对象个数
  11.      *
  12.      * @param leaseName 租约名称,无特殊要求,相同名称会竞争租约
  13.      * @return
  14.      */
  15.     Integer countLeaseInfo(String leaseName);
  16.     /**
  17.      * 查询无效的的租约
  18.      *
  19.      * @param leaseName 租约名称,无特殊要求,相同名称会竞争租约
  20.      * @return
  21.      */
  22.     LeaseInfo queryInValidateLease(String leaseName);
  23.     /**
  24.      * 查询有效的的租约
  25.      *
  26.      * @param leaseName 租约名称,无特殊要求,相同名称会竞争租约
  27.      * @return
  28.      */
  29.     LeaseInfo queryValidateLease(String leaseName);
  30.     /**
  31.      * 按前缀查询有效的租约信息
  32.      *
  33.      * @param namePrefix
  34.      * @return
  35.      */
  36.     List<LeaseInfo> queryValidateLeaseByNamePrefix(String namePrefix);
  37.     /**
  38.      * 增加租约
  39.      *
  40.      * @param leaseInfo 租约名称,无特殊要求,相同名称会竞争租约
  41.      */
  42.     void addLeaseInfo(LeaseInfo leaseInfo);
  43. }
复制代码

  • ILeaseStorage接口定义了updateLeaseInfo、countLeaseInfo、queryInValidateLease、queryValidateLease、queryValidateLeaseByNamePrefix、addLeaseInfo方法

DBLeaseStorage
  1. public class DBLeaseStorage implements ILeaseStorage {
  2.     private static final Log LOG = LogFactory.getLog(DBLeaseStorage.class);
  3.     protected JDBCDriver jdbcDataSource;
  4.     private String url;
  5.     protected String userName;
  6.     protected String password;
  7.     protected String jdbc;
  8.     public DBLeaseStorage(String jdbc, String url, String userName, String password) {
  9.         this.jdbc = jdbc;
  10.         this.url = url;
  11.         this.userName = userName;
  12.         this.password = password;
  13.         jdbcDataSource = DriverBuilder.createDriver(jdbc, url, userName, password);
  14.     }
  15.     @Override
  16.     public boolean updateLeaseInfo(LeaseInfo leaseInfo) {
  17.         String sql = "UPDATE lease_info SET version=version+1,status=#{status},gmt_modified=now()";
  18.         String whereSQL = " WHERE id=#{id} and version=#{version}";
  19.         if (StringUtil.isNotEmpty(leaseInfo.getLeaseName())) {
  20.             sql += ",lease_name=#{leaseName}";
  21.         }
  22.         if (StringUtil.isNotEmpty(leaseInfo.getLeaseUserIp())) {
  23.             sql += ",lease_user_ip=#{leaseUserIp}";
  24.         }
  25.         if (leaseInfo.getLeaseEndDate() != null) {
  26.             sql += ",lease_end_time=#{leaseEndDate}";
  27.         }
  28.         sql += whereSQL;
  29.         sql = SQLUtil.parseIbatisSQL(leaseInfo, sql);
  30.         try {
  31.             int count = getOrCreateJDBCDataSource().update(sql);
  32.             boolean success = count > 0;
  33.             if (success) {
  34.                 synchronized (this) {
  35.                     leaseInfo.setVersion(leaseInfo.getVersion() + 1);
  36.                 }
  37.             } else {
  38.                 System.out.println(count);
  39.             }
  40.             return success;
  41.         } catch (Exception e) {
  42.             LOG.error("LeaseServiceImpl updateLeaseInfo excuteUpdate error", e);
  43.             throw new RuntimeException("execute sql error " + sql, e);
  44.         }
  45.     }
  46.     @Override
  47.     public Integer countLeaseInfo(String leaseName) {
  48.         String sql = "SELECT count(*) as c FROM lease_info  WHERE lease_name = '" + leaseName + "' and status = 1";
  49.         try {
  50.             List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
  51.             if (rows == null || rows.size() == 0) {
  52.                 return null;
  53.             }
  54.             Long value = (Long) rows.get(0).get("c");
  55.             return value.intValue();
  56.         } catch (Exception e) {
  57.             throw new RuntimeException("execute sql error " + sql, e);
  58.         }
  59.     }
  60.     @Override
  61.     public LeaseInfo queryInValidateLease(String leaseName) {
  62.         String sql = "SELECT * FROM lease_info WHERE lease_name ='" + leaseName + "' and status=1 and lease_end_time<'" + DateUtil.getCurrentTimeString() + "'";
  63.         LOG.info("LeaseServiceImpl queryInValidateLease builder:" + sql);
  64.         return queryLease(leaseName, sql);
  65.     }
  66.     @Override
  67.     public LeaseInfo queryValidateLease(String leaseName) {
  68.         String sql = "SELECT * FROM lease_info WHERE lease_name ='" + leaseName + "' and status=1 and lease_end_time>now()";
  69.         return queryLease(leaseName, sql);
  70.     }
  71.     @Override
  72.     public List<LeaseInfo> queryValidateLeaseByNamePrefix(String namePrefix) {
  73.         String sql = "SELECT * FROM lease_info WHERE lease_name like '" + namePrefix + "%' and status=1 and lease_end_time>now()";
  74.         try {
  75.             List<LeaseInfo> leaseInfos = new ArrayList<>();
  76.             List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
  77.             if (rows == null || rows.size() == 0) {
  78.                 return null;
  79.             }
  80.             for (Map<String, Object> row : rows) {
  81.                 LeaseInfo leaseInfo = convert(row);
  82.                 leaseInfos.add(leaseInfo);
  83.             }
  84.             return leaseInfos;
  85.         } catch (Exception e) {
  86.             throw new RuntimeException("execute sql error " + sql, e);
  87.         }
  88.     }
  89.     @Override
  90.     public void addLeaseInfo(LeaseInfo leaseInfo) {
  91.         String sql =
  92.             " REPLACE INTO lease_info(lease_name,lease_user_ip,lease_end_time,status,version,gmt_create,gmt_modified)"
  93.                 + " VALUES (#{leaseName},#{leaseUserIp},#{leaseEndDate},#{status},#{version},now(),now())";
  94.         sql = SQLUtil.parseIbatisSQL(leaseInfo, sql);
  95.         try {
  96.             getOrCreateJDBCDataSource().execute(sql);
  97.         } catch (Exception e) {
  98.             LOG.error("LeaseServiceImpl execute sql error,sql:" + sql, e);
  99.             throw new RuntimeException("execute sql error " + sql, e);
  100.         }
  101.     }
  102.     protected JDBCDriver getOrCreateJDBCDataSource() {
  103.         if (this.jdbcDataSource == null || !this.jdbcDataSource.isValidate()) {
  104.             synchronized (this) {
  105.                 if (this.jdbcDataSource == null || !this.jdbcDataSource.isValidate()) {
  106.                     this.jdbcDataSource =
  107.                         DriverBuilder.createDriver(this.jdbc, this.url, this.userName, this.password);
  108.                 }
  109.             }
  110.         }
  111.         return jdbcDataSource;
  112.     }
  113.     protected LeaseInfo queryLease(String name, String sql) {
  114.         try {
  115.             List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
  116.             if (rows == null || rows.size() == 0) {
  117.                 return null;
  118.             }
  119.             return convert(rows.get(0));
  120.         } catch (Exception e) {
  121.             throw new RuntimeException("execute sql error " + sql, e);
  122.         }
  123.     }
  124.     protected LeaseInfo convert(Map<String, Object> map) {
  125.         LeaseInfo leaseInfo = new LeaseInfo();
  126.         leaseInfo.setId(getMapLongValue("id", map));
  127.         leaseInfo.setCreateTime(getMapDateValue("gmt_create", map));
  128.         leaseInfo.setLeaseEndDate(getMapDateValue("lease_end_time", map));
  129.         leaseInfo.setLeaseName(getMapValue("lease_name", map, String.class));
  130.         leaseInfo.setLeaseUserIp(getMapValue("lease_user_ip", map, String.class));
  131.         Integer status = getMapValue("status", map, Integer.class);
  132.         if (status != null) {
  133.             leaseInfo.setStatus(status);
  134.         }
  135.         leaseInfo.setUpdateTime(getMapDateValue("gmt_modified", map));
  136.         Long version = getMapLongValue("version", map);
  137.         if (version != null) {
  138.             leaseInfo.setVersion(version);
  139.         }
  140.         return leaseInfo;
  141.     }
  142.     @SuppressWarnings("unchecked")
  143.     private <T> T getMapValue(String fieldName, Map<String, Object> map, Class<T> integerClass) {
  144.         Object value = map.get(fieldName);
  145.         if (value == null) {
  146.             return null;
  147.         }
  148.         return (T) value;
  149.     }
  150.     private Long getMapLongValue(String fieldName, Map<String, Object> map) {
  151.         Object value = map.get(fieldName);
  152.         if (value == null) {
  153.             return null;
  154.         }
  155.         if (value instanceof Long) {
  156.             return (Long) value;
  157.         }
  158.         if (value instanceof BigInteger) {
  159.             return ((BigInteger) value).longValue();
  160.         }
  161.         return null;
  162.     }
  163.     private Date getMapDateValue(String fieldName, Map<String, Object> map) {
  164.         Object value = map.get(fieldName);
  165.         if (value == null) {
  166.             return null;
  167.         }
  168.         if (value instanceof Date) {
  169.             return (Date) value;
  170.         }
  171.         if (value instanceof String) {
  172.             return DateUtil.parseTime(((String) value));
  173.         }
  174.         return null;
  175.     }
  176. }
复制代码

  • DBLeaseStorage实现了ILeaseStorage接口,使用jdbc实现了其方法

小结

rocketmq-streams的LeaseService基于db实现了租约和锁,可用于主备场景切换。
以上就是rocketmq-streams的ILeaseService使用示例详解的详细内容,更多关于rocketmq streams ILeaseService的资料请关注脚本之家其它相关文章!

来源:https://www.jb51.net/server/292157odk.htm
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具