七剑下江湖用 发表于 2023-7-16 11:10:01

rocketmq-streams的ILeaseService使用示例详解



本文主要研究一下rocketmq-streams的ILeaseService

ILeaseService

/**
* 通过db实现租约和锁,可以更轻量级,减少其他中间件的依赖 使用主备场景,只有一个实例运行,当当前实例挂掉,在一定时间内,会被其他实例接手 也可以用于全局锁
*/
public interface ILeaseService {
    /**
   * 默认锁定时间
   */
    static final int DEFALUT_LOCK_TIME = 60 * 5;
    /**
   * 检查某用户当前时间是否具有租约。这个方法是纯内存操作,无性能开销
   *
   * @return true,租约有效;false,租约无效
   */
    boolean hasLease(String name);
    /**
   * 申请租约,会启动一个线程,不停申请租约,直到申请成功。 申请成功后,每 租期/2 续约。 如果目前被其他租户获取租约,只有在对方租约失效,后才允许新的租户获取租约
   *
   * @param name 租约名称,无特殊要求,相同名称会竞争租约
   */
    void startLeaseTask(String name);
    /**
   * 申请租约,会启动一个线程,不停申请租约,直到申请成功。 申请成功后,每 租期/2 续约。 如果目前被其他租户获取租约,只有在对方租约失效,后才允许新的租户获取租约
   *
   * @param name   租约名称,无特殊要求,相同名称会竞争租约
   * @param callback 当第一获取租约时,回调此函数
   */
    void startLeaseTask(final String name, ILeaseGetCallback callback);
    /**
   * 申请租约,会启动一个线程,不停申请租约,直到申请成功。 申请成功后,每 租期/2 续约。 如果目前被其他租户获取租约,只有在对方租约失效,后才允许新的租户获取租约
   *
   * @param name            租约名称,无特殊要求,相同名称会竞争租约
   * @param leaseTermSecond 租期,在租期内可以做业务处理,单位是秒
   * @param callback      当第一获取租约时,回调此函数
   */
    void startLeaseTask(final String name, int leaseTermSecond, ILeaseGetCallback callback);
    /**
   * 申请锁,无论成功与否,立刻返回。如果不释放,最大锁定时间是5分钟
   *
   * @param name       业务名称
   * @param lockerName 锁名称
   * @return 是否枷锁成功
   */
    boolean lock(String name, String lockerName);
    /**
   * 申请锁,无论成功与否,立刻返回。默认锁定时间是5分钟
   *
   * @param name         业务名称
   * @param lockerName   锁名称
   * @param lockTimeSecond 如果不释放,锁定的最大时间,单位是秒
   * @return 是否枷锁成功
   * @return
   */
    boolean lock(String name, String lockerName, int lockTimeSecond);
    /**
   * 申请锁,如果没有则等待,等待时间可以指定,如果是-1 则无限等待。如果不释放,最大锁定时间是5分钟
   *
   * @param name       业务名称
   * @param lockerName 锁名称
   * @param waitTime   没获取锁时,最大等待多长时间,如果是-1 则无限等待
   * @return 是否枷锁成功
   */
    boolean tryLocker(String name, String lockerName, long waitTime);
    /**
   * 申请锁,如果没有则等待,等待时间可以指定,如果是-1 则无限等待。如果不释放,最大锁定时间是lockTimeSecond
   *
   * @param name         业务名称
   * @param lockerName   锁名称
   * @param waitTime       没获取锁时,最大等待多长时间,如果是-1 则无限等待
   * @param lockTimeSecond 如果不释放,锁定的最大时间,单位是秒
   * @return 是否枷锁成功
   */
    boolean tryLocker(String name, String lockerName, long waitTime, int lockTimeSecond);
    /**
   * 释放锁
   *
   * @param name
   * @param lockerName
   * @return
   */
    boolean unlock(String name, String lockerName);
    /**
   * 对于已经获取锁的,可以通过这个方法,一直持有锁。 和租约的区别是,当释放锁后,无其他实例抢占。无法实现主备模式
   *
   * @param name         业务名称
   * @param lockerName   锁名称
   * @param lockTimeSecond 租期,这个方法会自动续约,如果不主动释放,会一直持有锁
   * @return 是否成功获取锁
   */
    boolean holdLock(String name, String lockerName, int lockTimeSecond);
    /**
   * 是否持有锁,不会申请锁。如果以前申请过,且未过期,返回true,否则返回false
   *
   * @param name       业务名称
   * @param lockerName 锁名称
   * @return
   */
    boolean hasHoldLock(String name, String lockerName);
    List<LeaseInfo> queryLockedInstanceByNamePrefix(String name, String lockerNamePrefix);
}

[*]ILeaseService接口定义了hasLease、startLeaseTask、lock、tryLocker、unlock、holdLock、hasHoldLock、queryLockedInstanceByNamePrefix方法

BasedLesaseImpl

public abstract class BasedLesaseImpl implements ILeaseService {
    private static final Log LOG = LogFactory.getLog(BasedLesaseImpl.class);
    private static final String CONSISTENT_HASH_PREFIX = "consistent_hash_";
    private static final AtomicBoolean syncStart = new AtomicBoolean(false);
    private static final int synTime = 120;// 5分钟的一致性hash同步时间太久了,改为2分钟
    protected ScheduledExecutorService taskExecutor = null;
    protected int leaseTerm = 300 * 2;                                  // 租约时间
    // protected transient JDBCDriver jdbcDataSource = null;
    protected ILeaseStorage leaseStorage;
    protected volatile Map<String, Date> leaseName2Date = new ConcurrentHashMap<>();    // 每个lease name对应的租约到期时间
    public BasedLesaseImpl() {
      taskExecutor = new ScheduledThreadPoolExecutor(10);
    }
    /**
   * lease_name: consistent_hash_ip, lease_user_ip: ip,定时刷新lease_info表,检查一致性hash环的节点情况
   *
   * @param name
   * @return
   */
    @Override
    public boolean hasLease(String name) {
      // 内存中没有租约信息则表示 没有租约
      Date leaseEndTime = leaseName2Date.get(name);
      if (leaseEndTime == null) {
            // LOG.info("内存中根据 " + name + "没有查询到租约信息,表示没有租约");
            return false;
      }
      // LOG.info("查询是否有租约 name:" + name + " ,当前时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
      // + " 租约到期时间 " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(leaseEndTime));
      // 有租约时间,并且租约时间大于当前时间,表示有租约信息
      if (new Date().before(leaseEndTime)) {
            return true;
      }
      return false;
    }
    private final Map<String, AtomicBoolean> startLeaseMap = new HashMap<>();
    @Override
    public void startLeaseTask(final String name) {
      startLeaseTask(name, this.leaseTerm, null);
    }
    @Override
    public void startLeaseTask(final String name, ILeaseGetCallback callback) {
      startLeaseTask(name, this.leaseTerm, callback);
    }
    @Override
    public void startLeaseTask(final String name, int leaseTerm, ILeaseGetCallback callback) {
      ApplyTask applyTask = new ApplyTask(leaseTerm, name, callback);
      startLeaseTask(name, applyTask, leaseTerm / 2, true);
    }
    /**
   * 启动定时器,定时执行任务,确保任务可重入
   *
   * @param name
   * @param runnable   具体任务
   * @param scheduleTime 调度时间
   * @param startNow   是否立刻启动一次
   */
    protected void startLeaseTask(final String name, Runnable runnable, int scheduleTime, boolean startNow) {
      AtomicBoolean isStartLease = startLeaseMap.get(name);//多次调用,只启动一次定时任务
      if (isStartLease == null) {
            synchronized (this) {
                isStartLease = startLeaseMap.get(name);
                if (isStartLease == null) {
                  isStartLease = new AtomicBoolean(false);
                  startLeaseMap.put(name, isStartLease);
                }
            }
      }
      if (isStartLease.compareAndSet(false, true)) {
            if (startNow) {
                runnable.run();
            }
            taskExecutor.scheduleWithFixedDelay(runnable, 0, scheduleTime, TimeUnit.SECONDS);
      }
    }
    //......
}

[*]BasedLesaseImpl声明实现了ILeaseService,它依赖ILeaseStorage,startLeaseTask方法会创建ApplyTask,然后以固定间隔调度执行

ApplyTask

/**
   * 续约任务
   */
    protected class ApplyTask implements Runnable {
      protected String name;
      protected int leaseTerm;
      protected ILeaseGetCallback callback;
      public ApplyTask(int leaseTerm, String name) {
            this(leaseTerm, name, null);
      }
      public ApplyTask(int leaseTerm, String name, ILeaseGetCallback callback) {
            this.name = name;
            this.leaseTerm = leaseTerm;
            this.callback = callback;
      }
      @Override
      public void run() {
            try {
                // LOG.info("LeaseServiceImpl name: " + name + "开始获取租约...");
                AtomicBoolean newApplyLease = new AtomicBoolean(false);
                Date leaseDate = applyLeaseTask(leaseTerm, name, newApplyLease);
                if (leaseDate != null) {
                  leaseName2Date.put(name, leaseDate);
                  LOG.info("LeaseServiceImpl, name: " + name + " " + getSelfUser() + " 获取租约成功, 租约到期时间为 "
                        + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(leaseDate));
                } else {
                  // fix.2020.08.13 这时name对应的租约可能还在有效期内,或者本机还持有租约,需要remove
                  //leaseName2Date.remove(name);
                  LOG.info("LeaseServiceImpl name: " + name + " " + getSelfUser() + " 获取租约失败 ");
                }
                if (newApplyLease.get() &amp;&amp; callback != null) {
                  callback.callback(leaseDate);
                }
            } catch (Exception e) {
                LOG.error(" LeaseServiceImpl name: " + name + "" + getSelfUser() + " 获取租约出现异常 ", e);
            }
      }
    }
    /**
   * 申请租约,如果当期租约有效,直接更新一个租约周期,如果当前租约无效,先查询是否有有效的租约,如果有申请失败,否则直接申请租约
   */
    protected Date applyLeaseTask(int leaseTerm, String name, AtomicBoolean newApplyLease) {
      // 计算下一次租约时间 = 当前时间 + 租约时长
      Date nextLeaseDate = DateUtil.addSecond(new Date(), leaseTerm);
      // 1 如果已经有租约,则更新租约时间(内存和数据库)即可
      if (hasLease(name)) {
            // LOG.info("用户已有租约,更新数据库和内存中的租约信息");
            // 更新数据库
            LeaseInfo leaseInfo = queryValidateLease(name);
            if (leaseInfo == null) {
                LOG.error("LeaseServiceImpl applyLeaseTask leaseInfo is null");
                return null;
            }
            // fix.2020.08.13,与本机ip相等且满足一致性hash分配策略,才续约,其他情况为null
            String leaseUserIp = leaseInfo.getLeaseUserIp();
            if (!leaseUserIp.equals(getSelfUser())) {
                return null;
            }
            leaseInfo.setLeaseEndDate(nextLeaseDate);
            updateLeaseInfo(leaseInfo);
            return nextLeaseDate;
      }
      // 2 没有租约情况 判断是否可以获取租约,只要租约没有被其他人获取,则说明有有效租约
      boolean success = canGetLease(name);
      if (!success) { // 表示被其他机器获取到了有效的租约
            // LOG.info("其他机器获取到了有效的租约");
            return null;
      }
      // 3 没有租约而且可以获取租约的情况,则尝试使用数据库原子更新的方式获取租约,保证只有一台机器成功获取租约,而且可以运行
      boolean flag = tryGetLease(name, nextLeaseDate);
      if (flag) { // 获取租约成功
            newApplyLease.set(true);
            return nextLeaseDate;
      }
      return null;
    }

[*]ApplyTask内部调用applyLeaseTask,如果已有租约则更新租约时间,没有租约则判断是否可以获取租约,可以则执行tryGetLease

tryGetLease

/**
   * 更新数据库,占用租期并更新租期时间
   *
   * @param time
   */
    protected boolean tryGetLease(String name, Date time) {
      // LOG.info("尝试获取租约 lease name is : " + name + " 下次到期时间: "
      // + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time));
      LeaseInfo validateLeaseInfo = queryValidateLease(name);
      if (validateLeaseInfo == null) {// 这里有两种情况 1 数据库里面没有租约信息 2 数据库里面有租约信息但是已经过期
            Integer count = countLeaseInfo(name);
            if (count == null || count == 0) {// 表示现在数据库里面没有任何租约信息,插入租约成功则表示获取成功,失败表示在这一时刻其他机器获取了租约
                // LOG.info("数据库中暂时没有租约信息,尝试原子插入租约:" + name);
                // fix.2020.08.13,经过一致性hash计算,该名字的任务不应该在本机执行,直接返回,无需插入。只有分配到hash执行权限的机器才可以插入并获取租约
                if (!getSelfUser().equals(getConsistentHashHost(name))) {
                  return false;
                }
                validateLeaseInfo = new LeaseInfo();
                validateLeaseInfo.setLeaseName(name);
                validateLeaseInfo.setLeaseUserIp(getSelfUser());
                validateLeaseInfo.setLeaseEndDate(time);
                validateLeaseInfo.setStatus(1);
                validateLeaseInfo.setVersion(1);
                if (insert(validateLeaseInfo)) {
                  LOG.info("数据库中暂时没有租约信息,原子插入成功,获取租约成功:" + name);
                  return true;
                } else {
                  LOG.info("数据库中暂时没有租约信息,原子插入失败,已经被其他机器获取租约:" + name);
                  return false;
                }
            } else { // 表示数据库里面有一条但是无效,这里需要两台机器按照version进行原子更新,更新成功的获取租约
                // LOG.info("数据库中有一条无效的租约信息,尝试根据版本号去原子更新租约信息:" + name);
                LeaseInfo inValidateLeaseInfo = queryInValidateLease(name);
                if (inValidateLeaseInfo == null) {// 说明这个时候另外一台机器获取成功了
                  LOG.info("另外一台机器获取成功了租约:" + name);
                  return false;
                }
                // fix.2020.08.13,机器重启之后,该名字的任务已经不分配在此机器上执行,直接返回,无需更新数据库
                if (!getSelfUser().equals(getConsistentHashHost(name))) {
                  return false;
                }
                inValidateLeaseInfo.setLeaseName(name);
                inValidateLeaseInfo.setLeaseUserIp(getSelfUser());
                inValidateLeaseInfo.setLeaseEndDate(time);
                inValidateLeaseInfo.setStatus(1);
                boolean success = updateDBLeaseInfo(inValidateLeaseInfo);
                if (success) {
                  LOG.info("LeaseServiceImpl 原子更新租约成功,当前机器获取到了租约信息:" + name);
                } else {
                  LOG.info("LeaseServiceImpl 原子更新租约失败,租约被其他机器获取:" + name);
                }
                return success;
            }
      } else { // 判断是否是自己获取了租约,如果是自己获取了租约则更新时间(内存和数据库),
            // 这里是为了解决机器重启的情况,机器重启,内存中没有租约信息,但是实际上该用户是有租约权限的
            // fix.2020.08.13,租约的ip与本机ip相等,且满足一致性hash策略,才会被本机执行
            String leaseUserIp = validateLeaseInfo.getLeaseUserIp();
            if (leaseUserIp.equals(getSelfUser())) {
                // 如果当期用户有租约信息,则更新数据库
                validateLeaseInfo.setLeaseEndDate(time);
                boolean hasUpdate = updateLeaseInfo(validateLeaseInfo);
                if (hasUpdate) {
                  LOG.info(
                        "LeaseServiceImpl机器重启情况,当前用户有租约信息,并且更新数据库成功,租约信息为 name :" + validateLeaseInfo.getLeaseName()
                            + " ip : " + validateLeaseInfo.getLeaseUserIp() + " 到期时间 : " + new SimpleDateFormat(
                            "yyyy-MM-dd HH:mm:ss").format(validateLeaseInfo.getLeaseEndDate()));
                  return true;
                } else {
                  LOG.info("LeaseServiceImpl 机器重启情况,当前用户有租约信息,并且更新数据库失败,表示失去租约:" + name);
                  return false;
                }
            }
            // LOG.info("LeaseServiceImpl 租约被其他机器获取,租约信息为 name :" + validateLeaseInfo.getLeaseName() + " ip : "
            // + validateLeaseInfo.getLeaseUserIp() + " 到期时间 : "
            // + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(validateLeaseInfo.getLeaseEndDate()));
            return false;
      }
    }
    protected LeaseInfo queryValidateLease(String name) {
      //String sql = "SELECT * FROM lease_info WHERE lease_name ='" + name + "' and status=1 and lease_end_time>now()";
      //// LOG.info("LeaseServiceImpl query validate lease sql:" + sql);
      //return queryLease(name, sql);
      return leaseStorage.queryValidateLease(name);
    }

[*]tryGetLease先通过queryValidateLease查询租约信息,若没有租约则插入,若过期则根据版本号更新,若已有租约则判断是否是自己获取了租约,是则更新租约信息

LeaseServiceImpl

public class LeaseServiceImpl extends BasedLesaseImpl {
    private static final Log LOG = LogFactory.getLog(LeaseServiceImpl.class);
    private transient ConcurrentHashMap<String, HoldLockTask> holdLockTasks = new ConcurrentHashMap();
    protected ConcurrentHashMap<String, HoldLockFunture> seizeLockingFuntures = new ConcurrentHashMap<>();
    //如果是抢占锁状态中,则不允许申请锁
    public LeaseServiceImpl() {
      super();
    }
    /**
   * 尝试获取锁,可以等待waitTime,如果到点未返回,则直接返回。如果是-1,则一直等待
   *
   * @param name       业务名称
   * @param lockerName 锁名称
   * @param waitTime   等待时间,是微秒单位
   * @return
   */
    @Override
    public boolean tryLocker(String name, String lockerName, long waitTime) {
      return tryLocker(name, lockerName, waitTime, ILeaseService.DEFALUT_LOCK_TIME);
    }
    @Override
    public boolean tryLocker(String name, String lockerName, long waitTime, int lockTimeSecond) {
      long now = System.currentTimeMillis();
      boolean success = lock(name, lockerName, lockTimeSecond);
      while (!success) {
            if (waitTime > -1 && (System.currentTimeMillis() - now > waitTime)) {
                break;
            }
            success = lock(name, lockerName, lockTimeSecond);
            if (success) {
                return success;
            }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                LOG.error("LeaseServiceImpl try locker error", e);
            }
      }
      return success;
    }
    @Override
    public boolean lock(String name, String lockerName) {
      return lock(name, lockerName, ILeaseService.DEFALUT_LOCK_TIME);
    }
    @Override
    public boolean lock(String name, String lockerName, int leaseSecond) {
      lockerName = createLockName(name, lockerName);
      Future future = seizeLockingFuntures.get(lockerName);
      if (future != null && ((HoldLockFunture)future).isDone == false) {
            return false;
      }
      Date nextLeaseDate =
            DateUtil.addSecond(new Date(), leaseSecond);// 默认锁定5分钟,用完需要立刻释放.如果时间不同步,可能导致锁失败
      return tryGetLease(lockerName, nextLeaseDate);
    }
    @Override
    public boolean unlock(String name, String lockerName) {
      // LOG.info("LeaseServiceImpl unlock,name:" + name);
      lockerName = createLockName(name, lockerName);
      LeaseInfo validateLeaseInfo = queryValidateLease(lockerName);
      if (validateLeaseInfo == null) {
            LOG.warn("LeaseServiceImpl unlock,validateLeaseInfo is null,lockerName:" + lockerName);
      }
      if (validateLeaseInfo != null && validateLeaseInfo.getLeaseUserIp().equals(getSelfUser())) {
            validateLeaseInfo.setStatus(0);
            updateDBLeaseInfo(validateLeaseInfo);
      }
      HoldLockTask holdLockTask = holdLockTasks.remove(lockerName);
      if (holdLockTask != null) {
            holdLockTask.close();
      }
      leaseName2Date.remove(lockerName);
      return false;
    }
    /**
   * 如果有锁,则一直持有,如果不能获取,则结束。和租约不同,租约是没有也会尝试重试,一备对方挂机,自己可以接手工作
   *
   * @param name
   * @param secondeName
   * @param lockTimeSecond 获取锁的时间
   * @return
   */
    @Override
    public boolean holdLock(String name, String secondeName, int lockTimeSecond) {
      if (hasHoldLock(name, secondeName)) {
            return true;
      }
      synchronized (this) {
            if (hasHoldLock(name, secondeName)) {
                return true;
            }
            String lockerName = createLockName(name, secondeName);
            Date nextLeaseDate =
                DateUtil.addSecond(new Date(), lockTimeSecond);
            boolean success = tryGetLease(lockerName, nextLeaseDate);// 申请锁,锁的时间是leaseTerm
            if (!success) {
                return false;
            }
            leaseName2Date.put(lockerName, nextLeaseDate);
            if (!holdLockTasks.containsKey(lockerName)) {
                HoldLockTask holdLockTask = new HoldLockTask(lockTimeSecond, lockerName, this);
                holdLockTask.start();
                holdLockTasks.putIfAbsent(lockerName, holdLockTask);
            }
      }
      return true;
    }
    /**
   * 是否持有锁,不访问数据库,直接看本地
   *
   * @param name
   * @param secondeName
   * @return
   */
    @Override
    public boolean hasHoldLock(String name, String secondeName) {
      String lockerName = createLockName(name, secondeName);
      return hasLease(lockerName);
    }
    @Override
    public List<LeaseInfo> queryLockedInstanceByNamePrefix(String name, String lockerNamePrefix) {
      String leaseNamePrefix = MapKeyUtil.createKey(name, lockerNamePrefix);
      return queryValidateLeaseByNamePrefix(leaseNamePrefix);
    }
    //......
}

[*]LeaseServiceImpl继承了BasedLesaseImpl,tryLocker方法会根据等待时间循环执行lock,lock方法则执行tryGetLease,unlock方法则更新租约信息,同时移除内存记录;holdLock则通过hasHoldLock判断是否持有锁,若有则返回,没有则执行tryGetLease

ILeaseStorage

public interface ILeaseStorage {
    /**
   * 更新lease info,需要是原子操作,存储保障多线程操作的原子性
   *
   * @param leaseInfo 租约表数据
   * @return
   */
    boolean updateLeaseInfo(LeaseInfo leaseInfo);
    /**
   * 统计这个租约名称下,LeaseInfo对象个数
   *
   * @param leaseName 租约名称,无特殊要求,相同名称会竞争租约
   * @return
   */
    Integer countLeaseInfo(String leaseName);
    /**
   * 查询无效的的租约
   *
   * @param leaseName 租约名称,无特殊要求,相同名称会竞争租约
   * @return
   */
    LeaseInfo queryInValidateLease(String leaseName);
    /**
   * 查询有效的的租约
   *
   * @param leaseName 租约名称,无特殊要求,相同名称会竞争租约
   * @return
   */
    LeaseInfo queryValidateLease(String leaseName);
    /**
   * 按前缀查询有效的租约信息
   *
   * @param namePrefix
   * @return
   */
    List<LeaseInfo> queryValidateLeaseByNamePrefix(String namePrefix);
    /**
   * 增加租约
   *
   * @param leaseInfo 租约名称,无特殊要求,相同名称会竞争租约
   */
    void addLeaseInfo(LeaseInfo leaseInfo);
}

[*]ILeaseStorage接口定义了updateLeaseInfo、countLeaseInfo、queryInValidateLease、queryValidateLease、queryValidateLeaseByNamePrefix、addLeaseInfo方法

DBLeaseStorage

public class DBLeaseStorage implements ILeaseStorage {
    private static final Log LOG = LogFactory.getLog(DBLeaseStorage.class);
    protected JDBCDriver jdbcDataSource;
    private String url;
    protected String userName;
    protected String password;
    protected String jdbc;
    public DBLeaseStorage(String jdbc, String url, String userName, String password) {
      this.jdbc = jdbc;
      this.url = url;
      this.userName = userName;
      this.password = password;
      jdbcDataSource = DriverBuilder.createDriver(jdbc, url, userName, password);
    }
    @Override
    public boolean updateLeaseInfo(LeaseInfo leaseInfo) {
      String sql = "UPDATE lease_info SET version=version+1,status=#{status},gmt_modified=now()";
      String whereSQL = " WHERE id=#{id} and version=#{version}";
      if (StringUtil.isNotEmpty(leaseInfo.getLeaseName())) {
            sql += ",lease_name=#{leaseName}";
      }
      if (StringUtil.isNotEmpty(leaseInfo.getLeaseUserIp())) {
            sql += ",lease_user_ip=#{leaseUserIp}";
      }
      if (leaseInfo.getLeaseEndDate() != null) {
            sql += ",lease_end_time=#{leaseEndDate}";
      }
      sql += whereSQL;
      sql = SQLUtil.parseIbatisSQL(leaseInfo, sql);
      try {
            int count = getOrCreateJDBCDataSource().update(sql);
            boolean success = count > 0;
            if (success) {
                synchronized (this) {
                  leaseInfo.setVersion(leaseInfo.getVersion() + 1);
                }
            } else {
                System.out.println(count);
            }
            return success;
      } catch (Exception e) {
            LOG.error("LeaseServiceImpl updateLeaseInfo excuteUpdate error", e);
            throw new RuntimeException("execute sql error " + sql, e);
      }
    }
    @Override
    public Integer countLeaseInfo(String leaseName) {
      String sql = "SELECT count(*) as c FROM lease_infoWHERE lease_name = '" + leaseName + "' and status = 1";
      try {
            List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
            if (rows == null || rows.size() == 0) {
                return null;
            }
            Long value = (Long) rows.get(0).get("c");
            return value.intValue();
      } catch (Exception e) {
            throw new RuntimeException("execute sql error " + sql, e);
      }
    }
    @Override
    public LeaseInfo queryInValidateLease(String leaseName) {
      String sql = "SELECT * FROM lease_info WHERE lease_name ='" + leaseName + "' and status=1 and lease_end_time<'" + DateUtil.getCurrentTimeString() + "'";
      LOG.info("LeaseServiceImpl queryInValidateLease builder:" + sql);
      return queryLease(leaseName, sql);
    }
    @Override
    public LeaseInfo queryValidateLease(String leaseName) {
      String sql = "SELECT * FROM lease_info WHERE lease_name ='" + leaseName + "' and status=1 and lease_end_time>now()";
      return queryLease(leaseName, sql);
    }
    @Override
    public List<LeaseInfo> queryValidateLeaseByNamePrefix(String namePrefix) {
      String sql = "SELECT * FROM lease_info WHERE lease_name like '" + namePrefix + "%' and status=1 and lease_end_time>now()";
      try {
            List<LeaseInfo> leaseInfos = new ArrayList<>();
            List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
            if (rows == null || rows.size() == 0) {
                return null;
            }
            for (Map<String, Object> row : rows) {
                LeaseInfo leaseInfo = convert(row);
                leaseInfos.add(leaseInfo);
            }
            return leaseInfos;
      } catch (Exception e) {
            throw new RuntimeException("execute sql error " + sql, e);
      }
    }
    @Override
    public void addLeaseInfo(LeaseInfo leaseInfo) {
      String sql =
            " REPLACE INTO lease_info(lease_name,lease_user_ip,lease_end_time,status,version,gmt_create,gmt_modified)"
                + " VALUES (#{leaseName},#{leaseUserIp},#{leaseEndDate},#{status},#{version},now(),now())";
      sql = SQLUtil.parseIbatisSQL(leaseInfo, sql);
      try {
            getOrCreateJDBCDataSource().execute(sql);
      } catch (Exception e) {
            LOG.error("LeaseServiceImpl execute sql error,sql:" + sql, e);
            throw new RuntimeException("execute sql error " + sql, e);
      }
    }
    protected JDBCDriver getOrCreateJDBCDataSource() {
      if (this.jdbcDataSource == null || !this.jdbcDataSource.isValidate()) {
            synchronized (this) {
                if (this.jdbcDataSource == null || !this.jdbcDataSource.isValidate()) {
                  this.jdbcDataSource =
                        DriverBuilder.createDriver(this.jdbc, this.url, this.userName, this.password);
                }
            }
      }
      return jdbcDataSource;
    }
    protected LeaseInfo queryLease(String name, String sql) {
      try {
            List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
            if (rows == null || rows.size() == 0) {
                return null;
            }
            return convert(rows.get(0));
      } catch (Exception e) {
            throw new RuntimeException("execute sql error " + sql, e);
      }
    }
    protected LeaseInfo convert(Map<String, Object> map) {
      LeaseInfo leaseInfo = new LeaseInfo();
      leaseInfo.setId(getMapLongValue("id", map));
      leaseInfo.setCreateTime(getMapDateValue("gmt_create", map));
      leaseInfo.setLeaseEndDate(getMapDateValue("lease_end_time", map));
      leaseInfo.setLeaseName(getMapValue("lease_name", map, String.class));
      leaseInfo.setLeaseUserIp(getMapValue("lease_user_ip", map, String.class));
      Integer status = getMapValue("status", map, Integer.class);
      if (status != null) {
            leaseInfo.setStatus(status);
      }
      leaseInfo.setUpdateTime(getMapDateValue("gmt_modified", map));
      Long version = getMapLongValue("version", map);
      if (version != null) {
            leaseInfo.setVersion(version);
      }
      return leaseInfo;
    }
    @SuppressWarnings("unchecked")
    private <T> T getMapValue(String fieldName, Map<String, Object> map, Class<T> integerClass) {
      Object value = map.get(fieldName);
      if (value == null) {
            return null;
      }
      return (T) value;
    }
    private Long getMapLongValue(String fieldName, Map<String, Object> map) {
      Object value = map.get(fieldName);
      if (value == null) {
            return null;
      }
      if (value instanceof Long) {
            return (Long) value;
      }
      if (value instanceof BigInteger) {
            return ((BigInteger) value).longValue();
      }
      return null;
    }
    private Date getMapDateValue(String fieldName, Map<String, Object> map) {
      Object value = map.get(fieldName);
      if (value == null) {
            return null;
      }
      if (value instanceof Date) {
            return (Date) value;
      }
      if (value instanceof String) {
            return DateUtil.parseTime(((String) value));
      }
      return null;
    }
}

[*]DBLeaseStorage实现了ILeaseStorage接口,使用jdbc实现了其方法

小结

rocketmq-streams的LeaseService基于db实现了租约和锁,可用于主备场景切换。
以上就是rocketmq-streams的ILeaseService使用示例详解的详细内容,更多关于rocketmq streams ILeaseService的资料请关注脚本之家其它相关文章!

来源:https://www.jb51.net/server/292157odk.htm
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: rocketmq-streams的ILeaseService使用示例详解