翼度科技»论坛 编程开发 mysql 查看内容

Vitess全局唯一ID生成的实现方案

5

主题

5

帖子

15

积分

新手上路

Rank: 1

积分
15
为了标识一段数据,通常我们会为其指定一个唯一id,比如利用MySQL数据库中的自增主键。 但是当数据量非常大时,仅靠数据库的自增主键是远远不够的,并且对于分布式数据库只依赖MySQL的自增id无法满足全局唯一的需求。因此,产生了多种解决方案,如UUID,SnowFlake等。下文将介绍Vitess是如何解决这个问题的。
Vitess全局唯一id生成

在Vitess实现方案中,每个设置了全局唯一列的表,都会对应一张sequence序列表。例如对于表user,会对应一张名为user_seq的序列表,原表与序列表的关联关系会记录在元数据中。user表以及user_seq这两张表元数据信息分别如下:
user表元数据:分片键为name列,分片算法为hash;全局唯一列为id列,依赖user_seq表生成具体的值。
  1. {
  2.     "tables": {
  3.         "user": {
  4.             "column_vindexes": [
  5.                 {
  6.                     "column": "name",
  7.                     "name": "hash"
  8.                 }
  9.             ],
  10.             "auto_increment": {
  11.                 "column": "id",
  12.                 "sequence": "user_seq"
  13.             }
  14.         }
  15.     }
  16. }
复制代码
user_seq表元数据:表类型标识为sequence。
  1. {
  2.   "tables": {
  3.     "user_seq": {
  4.       "type": "sequence"
  5.     }
  6.   }
  7. }
复制代码
所有sequence表表结构相同,如下所示:
  1. CREATE TABLE user_seq (
  2.         id int,
  3.         next_id bigint,
  4.         cache bigint,
  5.         PRIMARY KEY (id)
  6. ) COMMENT 'vitess_sequence';
复制代码
且其中只有一条id为0的数据:
  1. mysql> select * from user_seq;
  2. +----+---------+-------+
  3. | id | next_id | cache |
  4. +----+---------+-------+
  5. |  0 |    1000 |   100 |
  6. +----+---------+-------+
复制代码
sequence表可以认为是一个分号器,cache字段表示每次发放号段的个数,next_id列表示每次发放号段的起始值****。Vitess每个分片在初始化时会从sequence根据next_id、cache获取号段保存在VtTablet(MySQL实例前的代理服务)的内存中,当内存中号段耗尽时,再次从sequence表中获取新号段。
我们深入代码看一下具体的实现逻辑:
  1. // 获取sequence的方法
  2. func (qre *QueryExecutor) execNextval() (*sqltypes.Result, error) {
  3.     // 从plan中获取inc(为要获取的id数量)以及tableName
  4.         inc, err := resolveNumber(qre.plan.NextCount, qre.bindVars)
  5.         tableName := qre.plan.TableName()
  6.         t := qre.plan.Table
  7.         t.SequenceInfo.Lock()
  8.         defer t.SequenceInfo.Unlock()
  9.         if t.SequenceInfo.NextVal == 0 || t.SequenceInfo.NextVal+inc > t.SequenceInfo.LastVal {
  10.         // 在事务中运行
  11.                 _, err := qre.execAsTransaction(func(conn *StatefulConnection) (*sqltypes.Result, error) {
  12.             // 使用select for update锁住行数据以免在计算并更新新值期间被其他线程修改
  13.                         query := fmt.Sprintf("select next_id, cache from %s where id = 0 for update", sqlparser.String(tableName))
  14.                         qr, err := qre.execSQL(conn, query, false)
  15.                         nextID, err := evalengine.ToInt64(qr.Rows[0][0])
  16.                         if t.SequenceInfo.LastVal != nextID {
  17.                 // 如果从_seq表读取得到的id值小于tablet缓存中id,则将缓存中的值更新到_seq表中
  18.                                 if nextID < t.SequenceInfo.LastVal {
  19.                                         log.Warningf("Sequence next ID value %v is below the currently cached max %v, updating it to max", nextID, t.SequenceInfo.LastVal)
  20.                                         nextID = t.SequenceInfo.LastVal
  21.                                 }
  22.                                 t.SequenceInfo.NextVal = nextID
  23.                                 t.SequenceInfo.LastVal = nextID
  24.                         }
  25.                         cache, err := evalengine.ToInt64(qr.Rows[0][1])
  26.             // 按照cache的倍数获取到大于inc量的缓存,计算出新newLast
  27.                         newLast := nextID + cache
  28.                         for newLast < t.SequenceInfo.NextVal+inc {
  29.                                 newLast += cache
  30.                         }
  31.             // 将新的边界值更新到_seq表中
  32.                         query = fmt.Sprintf("update %s set next_id = %d where id = 0", sqlparser.String(tableName), newLast)
  33.                         _, err = qre.execSQL(conn, query, false)
  34.                         t.SequenceInfo.LastVal = newLast
  35.                 })
  36.         }
  37.     // 返回获取的sequence值 更新SequenceInfo
  38.         ret := t.SequenceInfo.NextVal
  39.         t.SequenceInfo.NextVal += inc
  40.         return ret
  41. }
复制代码
从源码中可以看到:

  • Vitess使用了事务内锁行(select for update)的方式保证了多线程下查询并更新序列表不会互相干扰。
  • 如果VtTablet中自增序列值缓存不足或者号段耗尽后,从sequence表重新获取值,并更新序列表中next_id字段。
  • 根据inc的大小,即所需ID的数量,VtTablet会以cache为最小块,从序列表中获取n*cache个数量的id缓存在内存中。
补充说明:
1. sequence表为非拆分的表。
2. 全局唯一id生成无法保证连续性。
VtDriver实现方式

在Vitess的SDK客户端方案VtDriver中,sequence的生成逻辑被封装在了MySQL驱动包本身当中,与Vitess的方案类似,对于设置了全局自增的表,其sequence的生成同样依赖于对应的序列表,序列表的结构与Vitess的序列表相同(参上),但是读取并更新字段next_id的方式使用了CAS的方案:
  1. public long[] querySequenceValue(Vcursor vCursor, ResolvedShard resolvedShard, String sequenceTableName) throws SQLException, InterruptedException {
  2.         // cas 重试次数限制
  3.     int retryTimes = DEFAULT_RETRY_TIMES;
  4.     while (retryTimes > 0) {
  5.             // 查询_seq表中的sequence设置,其中cache为本地缓存的大小
  6.         String querySql = "select next_id, cache from " + sequenceTableName + " where id = 0";
  7.         VtResultSet vtResultSet = (VtResultSet) vCursor.executeStandalone(querySql, new HashMap<>(), resolvedShard, false);
  8.         long[] sequenceInfo = getVtResultValue(vtResultSet);
  9.         long next = sequenceInfo[0];
  10.         long cache = sequenceInfo[1];
  11.                 // 将计算出的next_id的值尝试更新到_seq表中,如果失败则重新读取并更新,直到成功为止
  12.         String updateSql = "update " + sequenceTableName + " set next_id = " + (next + cache) + " where next_id =" + sequenceInfo[0];
  13.         VtRowList vtRowList = vCursor.executeStandalone(updateSql, new HashMap<>(), resolvedShard, false);
  14.         if (vtRowList.getRowsAffected() == 1) {
  15.             sequenceInfo[0] = next;
  16.             return sequenceInfo;
  17.         }
  18.         retryTimes--;
  19.         Thread.sleep(ThreadLocalRandom.current().nextInt(1, 6));
  20.     }
  21.     throw new SQLException("Update sequence cache failed within retryTimes: " + DEFAULT_RETRY_TIMES);
  22. }
复制代码
在源码中可以看到:

  • 在整个查询并更新序列表的过程中,没有出现Vitess实现中的开启事务以及产生锁表的情况,而是使用了CAS更新的方式。
  • 利用update user_seq set next_id=? where next_id=?执行的返回值判断是否语句是否更新成功,如果失败则重新查询next_id的值,计算新值再尝试更新, 如果出现并发争抢的情况,Vtdriver中允许最多的重试次数DEFAULT_RETRY_TIMES为100次。
VtDriver中使用sequence的方式与MySQL自增键类似,如果设置了sequence的表在插入数据的过程中,自增列没有给定具体的值,会直接从本地缓存中获取自增ID,如果无缓存或者缓存不足时,才会路由到序列表所在MySQL服务获取sequence值
事务+锁表 or CAS ?

在Vitess实现sequence的源码当中,其更新序列表的过程为:开启事务时执行select for update,使用表锁,保证多线程安全。在现实往往充满了不确定性,我们可以想象一下:如果应用锁了数据库中的表后,由于自身的性能原因等而迟迟没有执行commit操作,或者应用节点出现了宕机的情况,此时:
应用宕机后,其持有的锁不会被释放!后续任何其他连接对于该表的任何SQL都会被持续阻塞!
​VtDriver作为Vitess的客户端方案,如果其sequence实现采用事务锁的方式,由于各个应用端都会与MySQL服务直连,即各个应用获取sequence的过程都会产生锁表的行为。此时,一旦应用端由于某些原因出现锁表时长增大,甚至于应用宕机的情况,则所有应用都会由于其锁表而产生非常明显的性能下降甚至死锁。采用cas的方式使得整个过程不需要显式的开启事务,不需要锁行,自然也不存在潜在的死锁风险。当然,CAS在并发高于一定程度时会出现各个线程互相争抢资源,此时会有更新失败不断重试的情况发生,给CPU带来一定的压力,而这可以通过设置更大的cache值,增加本地缓存数量的方式来调节。
作者:京东零售 金越
来源:京东云开发者社区 转载请注明来源

来源:https://www.cnblogs.com/jingdongkeji/p/17729733.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具