翼度科技»论坛 编程开发 .net 查看内容

基于C# Socket实现的简单的Redis客户端

7

主题

7

帖子

21

积分

新手上路

Rank: 1

积分
21
前言

    Redis是一款强大的高性能键值存储数据库,也是目前NOSQL中最流行比较流行的一款数据库,它在广泛的应用场景中扮演着至关重要的角色,包括但不限于缓存、消息队列、会话存储等。在本文中,我们将介绍如何基于C# Socket来实现一个简单的Redis客户端类RedisClient,来演示构建请求和输出的相关通信机制。需要注意的是本文只是着重展示如何基于原生的Socket方式与Redis Server进行通信,并不是构建一个强大的Redis开发工具包。
Redis简介

    Redis(Remote Dictionary Server)是一个内存数据库,它支持了非常丰富的数据结构,包括字符串、列表、集合、散列、有序集合等。Redis 提供了高性能的读写操作,可以用于缓存数据、消息队列、分布式锁、会话管理等多种用途。Redis 通常以键值对的方式存储数据,每个键都与一个值相关联,值的类型可以是字符串、列表、散列等。Redis不仅提供了丰富的命令集,用于操作存储在数据库中的数据,还提供了Redis serialization protocol (RESP) 协议来解析Redis Server返回的数据。相关的文档地址如下所示:
Redis 命令指南

    Redis命令是与Redis服务器进行通信的主要方式,通俗点就是发送指定格式的指令用于执行各种操作,包括数据存储、检索、修改和删除等。以下是一些日常使用过程中常见的Redis命令及其用途:

  • GET 和 SET 命令

    • GET key: 用于获取指定键的值。
    • SET key value: 用于设置指定键的值.

  • DEL 命令

    • DEL key: 用于删除指定键.

  • EXPIRE 和 TTL 命令

    • EXPIRE key seconds: 用于为指定键设置过期时间(秒).
    • TTL key: 用于获取指定键的剩余过期时间(秒).
    注意这里的时间单位是秒

  • INCR 和 DECR 命令

    • INCR key: 用于递增指定键的值.
    • DECR key: 用于递减指定键的值.

  • RPUSH 和 LPOP 命令

    • RPUSH key value: 用于将值添加到列表的右侧.
    • LPOP key: 用于从列表的左侧弹出一个值.

  • HSET 和 HGET 命令

    • HSET key field value: 用于设置哈希表中指定字段的值.
    • HGET key field: 用于获取哈希表中指定字段的值.

  • PUBLISH 和 SUBSCRIBE 命令

    • PUBLISH channel message: 用于向指定频道发布消息.
    • SUBSCRIBE channel: 用于订阅指定频道的消息.

当然 Redis 支持的命令远不止这些,它还包括对集合、有序集合、位图、HyperLogLog 等数据结构的操作,以及事务、Lua 脚本执行等高级功能。我们接下来演示的时候也只是展示几个大家比较熟悉的指令,这也是我们学习新知识的时候经常使用的方式,先从最简单最容易的开始入手,循序渐进,这也是微精通所提倡的方式。
Redis协议(RESP)

Redis Serialization Protocol (RESP) 是 Redis 使用的二进制协议,用于客户端和服务器之间的通信。我们可以通过该协议解析Redis服务器返回的命令格式,解析我们想要的数据。RESP具有简洁易解析的特点

  • 简单字符串协议:

    • 格式: +OK\r\n
    • 第一个字节是"+”,后跟消息内容,以"\r\n"(回车和换行)结束。
    • 示例:+OK\r\n

  • 批量字符串协议:

    • 格式: $5\r\nhello\r\n
    • 第一个字节是"$",后跟字符串的字节长度,然后是实际的字符串内容,最后以"\r\n"结束。
    • 示例:$5\r\nhello\r\n

  • 整数协议:

    • 格式: :42\r\n
    • 第一个字节是":",后跟整数的文本表示,以"\r\n"结束。
    • 示例::42\r\n

  • 数组协议:

    • 格式: *3\r\n:1\r\n:2\r\n:3\r\n
    • 第一个字节是"*",后跟数组中元素的数量,然后是数组中每个元素的 RESP 表示,以"\r\n"结束。
    • 示例:*3\r\n:1\r\n:2\r\n:3\r\n

  • 错误协议:

    • 格式: -Error message\r\n
    • 第一个字节是"-",后跟错误消息内容,以"\r\n"结束。
    • 示例:-Error message\r\n

需要注意的是字符串协议里面的长度不是具体字符的长度,而是对应的UTF8对应的字节数组的长度,这一点对于我们解析返回的数据很重要,否则获取数据的时候会影响数据的完整性。
RESP协议是Redis高效性能的关键之一,它相对比较加单,不需要解析各种头信息等,这使得Redis能够在处理大规模数据和请求时表现出色。了解RESP协议可以帮助您更好地理解Redis客户端类 RedisClient 的内部工作原理。可以理解为它属于一种应用层面的协议,通过给定的数据格式解析出想要的数据,这也对我们在实际编程过程中,解决类似的问题,提供了一个不错的思路。
实现RedisClient

    上面我们介绍了一些关于Redis的基础概念,重点介绍了一下关于Redis的命令和RESP,接下来我们就结合上面的理论,基于C# Socket来简单的模拟一下如何和Redis Server进行数据交互。主要就是结合Redis命令和Redis 协议(RESP)来简单的实现。
通信架子

首先来看一下类的结构
  1. public class RedisClient : IDisposable, IAsyncDisposable
  2. {
  3.     //定义默认端口
  4.     private readonly int DefaultPort = 6379;
  5.     //定义默认地址
  6.     private readonly string Host = "localhost";
  7.     //心跳间隔,单位为毫秒
  8.     private readonly int HeartbeatInterval = 30000;
  9.     private bool _isConnected;
  10.     //心跳定时器
  11.     private Timer _heartbeatTimer;
  12.     private Socket _socket;
  13.     public RedisClient(string host = "localhost", int defaultPort = 6379)
  14.     {
  15.         Host = host;
  16.         DefaultPort = defaultPort;
  17.         // 初始化心跳定时器
  18.         _heartbeatTimer = new Timer(HeartbeatCallback, null, HeartbeatInterval, HeartbeatInterval);
  19.     }
  20.     //连接方法
  21.     public async Task ConnectAsync(int timeoutMilliseconds = 5000)
  22.     {
  23.         _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  24.         var cts = new CancellationTokenSource(timeoutMilliseconds);
  25.         await _socket.ConnectAsync(Host, DefaultPort, cts.Token);
  26.         _isConnected = true;
  27.     }
  28.     //心跳方法
  29.     private async void HeartbeatCallback(object state)
  30.     {
  31.         if (_isConnected)
  32.         {
  33.             var pingCommand = "PING\r\n";
  34.             await SendCommandAsync(pingCommand);
  35.         }
  36.     }
  37.    
  38.     //释放逻辑
  39.     public void Dispose()
  40.     {
  41.         // 停止心跳定时器
  42.         _heartbeatTimer.Dispose();
  43.         if (_socket != null)
  44.         {
  45.             _socket.Shutdown(SocketShutdown.Both);
  46.             _socket.Close();
  47.         }
  48.     }
  49.     public ValueTask DisposeAsync()
  50.     {
  51.         Dispose();
  52.         return ValueTask.CompletedTask;
  53.     }
  54. }
复制代码
上面的类定义了实现的大致通信结构,结构中主要涉及到的是通信相关的功能实现,包含Socket的初始化信息、默认的连连接信息、心跳方法、释放逻辑等。首先,在构造函数中,指定了默认的Redis端口(6379)、地址(localhost),并初始化了心跳定时器。连接方法ConnectAsync通过Socket建立与Redis服务器的TCP连接。心跳定时器HeartbeatCallback定期发送PING命令,确保与服务器的连接保持活动。最后,Dispose方法用于释放资源,包括停止心跳定时器和关闭Socket连接,实现了IDisposable和IAsyncDisposable接口。这些功能为RedisClient类提供了基本的连接和资源管理能力。由于我对Socket编程也不是很熟悉,所以定义的可能不是很完善,有比较熟悉的同学,可以多多指导。
发送和解析

有了这个基础的架子之后,我们可以在里面填写具体的实现逻辑了。首先我们来定义发送Redis命令和解析RESP的逻辑
  1. //发送命令
  2. public async Task<string> SendCommandAsync(string command)
  3. {
  4.     // 发送命令的实现
  5.     if (!_isConnected)
  6.     {
  7.         // 如果连接已断开,可以进行重连
  8.         await ConnectAsync();
  9.     }
  10.    
  11.     //Redis的命令是以\r\n为结尾的
  12.     var request = Encoding.UTF8.GetBytes(command + "\r\n");
  13.     //发送命令
  14.     await _socket.SendAsync(new ArraySegment<byte>(request), SocketFlags.None);
  15.     var response = new StringBuilder();
  16.     var remainingData = string.Empty;
  17.     //初始化响应字符串和剩余数据
  18.     byte[] receiveBuffer = ArrayPool<byte>.Shared.Rent(1024);
  19.     try
  20.     {
  21.         while (true)
  22.         {
  23.             //读取返回信息
  24.             var bytesRead = await _socket.ReceiveAsync(new ArraySegment<byte>(receiveBuffer), SocketFlags.None);
  25.             //将接收到的数据添加到响应字符串
  26.             var responseData = remainingData + Encoding.UTF8.GetString(receiveBuffer, 0, bytesRead);
  27.             //提取完整的响应并添加到响应字符串中
  28.             var completeResponses = ExtractCompleteResponses(ref responseData);
  29.             foreach (var completeResponse in completeResponses)
  30.             {
  31.                 response.Append(completeResponse);
  32.             }
  33.             remainingData = responseData;
  34.             //结果为\r\n读取结束
  35.             if (response.ToString().EndsWith("\r\n"))
  36.             {
  37.                 break;
  38.             }
  39.         }
  40.     }
  41.     finally
  42.     {
  43.         //释放缓冲区
  44.         ArrayPool<byte>.Shared.Return(receiveBuffer);
  45.     }
  46.     //返回完整的响应字符串
  47.     return response.ToString();
  48. }
  49. private List<string> ExtractCompleteResponses(ref string data)
  50. {
  51.     var completeResponses = new List<string>();
  52.     while (true)
  53.     {
  54.         var index = data.IndexOf("\r\n");
  55.         if (index >= 0)
  56.         {
  57.              // 提取一个完整的响应
  58.             var completeResponse = data.Substring(0, index + 2);
  59.             //将完整的响应添加到列表中
  60.             completeResponses.Add(completeResponse);
  61.             data = data.Substring(index + 2);
  62.         }
  63.         else
  64.         {
  65.             break;
  66.         }
  67.     }
  68.     return completeResponses;
  69. }
  70. private string ParseResponse(string response)
  71. {
  72.     if (response.StartsWith("$"))
  73.     {
  74.         // 处理 Bulk Strings($)
  75.         var lengthStr = response.Substring(1, response.IndexOf('\r') - 1);
  76.         if (int.TryParse(lengthStr, out int length))
  77.         {
  78.             if (length == -1)
  79.             {
  80.                 return null!;
  81.             }
  82.             string rawRedisData = response.Substring(response.IndexOf('\n') + 1);
  83.             byte[] utf8Bytes = Encoding.UTF8.GetBytes(rawRedisData);
  84.             string value = Encoding.UTF8.GetString(utf8Bytes, 0, length);
  85.             return value;
  86.         }
  87.     }
  88.     else if (response.StartsWith("+"))
  89.     {
  90.         // 处理 Simple Strings(+)
  91.         return response.Substring(1, response.Length - 3);
  92.     }
  93.     else if (response.StartsWith(":"))
  94.     {
  95.         // 处理 Integers(:)
  96.         var valueStr = response.Substring(1, response.IndexOf('\r') - 1);
  97.         if (int.TryParse(valueStr, out int value))
  98.         {
  99.             return value.ToString();
  100.         }
  101.     }
  102.     // 如果响应格式不符合预期,抛出异常
  103.     throw new InvalidOperationException(response);
  104. }
复制代码
上面逻辑涉及到发送和接收Redis消息的三个方法SendCommandAsync、ExtractCompleteResponses、ParseResponse。虽然上面代码中有注释,但是咱们分别I简单的讲解一下这三个方法

  • SendCommandAsync
    该方法主要目的是向 Redis 服务器发送命令并异步接收响应

    • 连接检查:首先,检查连接状态 (_isConnected),如果连接已断开,则调用 ConnectAsync 方法进行重连。
    • 命令转换:将传入的命令字符串转换为 UTF-8 编码的字节数组,附加回车换行符 ("\r\n")。
    • 接收响应:使用异步循环接收来自服务器的响应。在每次接收之后,将接收到的数据添加到响应字符串中,并提取其中的完整响应。
    • 缓冲区管理:为了有效地处理接收到的数据,使用了一个缓冲区 (receiveBuffer),并在方法结束时通过 ArrayPool.Shared.Return 进行释放。
    • 提取完整响应:调用 ExtractCompleteResponses 方法,该方法从响应数据中提取出一个或多个完整的响应,将其从数据中移除,并返回一个列表。

  • ExtractCompleteResponses
    该方法主要用于从接收到的数据中提取出一个或多个完整的响应。

    • completeResponses 列表:用于存储提取出的完整响应的列表。
    • while 循环:循环进行以下操作,直到数据中没有换行符为止。
    • 提取完整响应:如果找到换行符,就提取从数据开头到换行符位置的子字符串,包括换行符本身,构成一个完整的响应。
    • 添加到列表:将提取出的完整响应添加到 completeResponses 列表中。

  • ParseResponse
    该方法主要用于解析从 Redis 服务器接收到的响应字符串。

    • 如果响应以 $ 开头,表示这是一个 Bulk String 类型的响应。
    • 如果响应以 + 开头,表示这是一个 Simple String 类型的响应。
    • 如果响应以 : 开头,表示这是一个 Integer 类型的响应。

简单操作方法

上面有了和Redis通信的基本方法,也有了解析RESP协议的基础方法,接下来咱们实现几个简单的Redis操作指令来展示一下Redis客户端具体是如何工作的,简单的几个方法如下所示
  1. //切换db操作
  2. public async Task SelectAsync(int dbIndex)
  3. {
  4.      var command = $"SELECT {dbIndex}";
  5.      await SendCommandAsync(command);
  6. }
  7. //get操作
  8. public async Task<string> GetAsync(string key)
  9. {
  10.      var command = $"GET {key}";
  11.      return ParseResponse(await SendCommandAsync(command));
  12. }
  13. //set操作
  14. public async Task<bool> SetAsync(string key, string value, TimeSpan? expiry = null)
  15. {
  16.      var command = $"SET {key} '{value}'";
  17.      //判断会否追加过期时间
  18.      if (expiry.HasValue)
  19.      {
  20.          command += $" EX {expiry.Value.TotalSeconds}";
  21.      }
  22.      var response = ParseResponse(await SendCommandAsync(command));
  23.      return response == "OK";
  24. }
  25. //支持过期时间的setnx操作
  26. public async Task<bool> SetNxAsync(string key, string value, TimeSpan? expiry = null)
  27. {
  28.     //因为默认的setnx方法不支持添加过期时间,为了保证操作的原子性,使用了lua
  29.      var command = $"EVAL "if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then if ARGV[2] then redis.call('EXPIRE', KEYS[1], ARGV[2]) end return true else return false end" 1 {key} '{value}'";
  30.      if (expiry.HasValue)
  31.      {
  32.          command += $" {expiry.Value.TotalSeconds}";
  33.      }
  34.      var response = ParseResponse(await SendCommandAsync(command));
  35.      return response == "1";
  36. }
  37. //添加支持函过期时间的list push操作
  38. public async Task<long> ListPushAsync(string key, string value, TimeSpan? expiry = null)
  39. {
  40.      var script = @"local len = redis.call('LPUSH', KEYS[1], ARGV[1])
  41.                      if tonumber(ARGV[2]) > 0 then
  42.                          redis.call('EXPIRE', KEYS[1], ARGV[2])
  43.                      end
  44.                      return len";
  45.      var keys = new string[] { key };
  46.      var args = new string[] { value, (expiry?.TotalSeconds ?? 0).ToString() };
  47.      var response = await ExecuteLuaScriptAsync(script, keys, args);
  48.      return long.Parse(response);
  49. }
  50. //list pop操作
  51. public async Task<string> ListPopAsync(string key)
  52. {
  53.      var command = $"LPOP {key}";
  54.      return ParseResponse(await SendCommandAsync(command));
  55. }
  56. //listrange操作
  57. public async Task<List<string>> ListRangeAsync(string key, int start, int end)
  58. {
  59.      var command = $"LRANGE {key} {start} {end}";
  60.      var response = await SendCommandAsync(command);
  61.      if (response.StartsWith("*0\r\n"))
  62.      {
  63.          return new List<string>();
  64.      }
  65.      
  66.      //由于list range返回了是一个数组,所以单独处理了一下,这里我使用了正则,解析字符串也可以,方法随意
  67.      var values = new List<string>();
  68.      var pattern = @"\$\d+\r\n(.*?)\r\n";
  69.      MatchCollection matches = Regex.Matches(response, pattern);
  70.      foreach (Match match in matches)
  71.      {
  72.          values.Add(match.Groups[1].Value);
  73.      }
  74.      return values;
  75. }
  76. //执行lua脚本的方法
  77. public async Task<string> ExecuteLuaScriptAsync(string script, string[]? keys = null, string[]? args = null)
  78. {
  79.     //去除lua里的换行
  80.      script = Regex.Replace(script, @"[\r\n]", "");
  81.      // 构建EVAL命令,将Lua脚本、keys和args发送到Redis服务器
  82.      var command = $"EVAL "{script}" { keys?.Length??0 } ";
  83.      //拼接key和value参数
  84.      if (keys != null && keys.Length != 0)
  85.      {
  86.          command += string.Join(" ", keys.Select(key => $"{key}"));
  87.      }
  88.      if (args != null && args.Length != 0)
  89.      {
  90.          command += " " + string.Join(" ", args.Select(arg => $"{arg}"));
  91.      }
  92.      return ParseResponse(await SendCommandAsync(command));
  93. }
  94. //redis发布操作
  95. public async Task SubscribeAsync(string channel, Action<string, string> handler)
  96. {
  97.      await SendCommandAsync($"SUBSCRIBE {channel}");
  98.      while (true)
  99.      {
  100.          var response = await SendCommandAsync(string.Empty);
  101.          string pattern = @"\*\d+\r\n\$\d+\r\n(.*?)\r\n\$\d+\r\n(.*?)\r\n\$\d+\r\n(.*?)\r\n";
  102.          Match match = Regex.Match(response, pattern);
  103.          if (match.Success)
  104.          {
  105.              string ch = match.Groups[2].Value;
  106.              string message = match.Groups[3].Value;
  107.              handler(ch, message);
  108.          }
  109.      }
  110. }
  111. //redis订阅操作
  112. public async Task PublishAsync(string channel, string message)
  113. {
  114.      await SendCommandAsync($"PUBLISH {channel} {message}");
  115. }
复制代码
上面方法中演示了几个比较常见的操作,很简单,主要是向大家展示Redis命令是如何发送的,从最简单的GET、SET、LIST、发布订阅、执行LUA操作方面着手,如果对Redis命令比较熟悉的话,操作起来还是比较简单的,这里给大家讲解几个比较有代表的方法

  • 首先关于setnx方法,由于自带的setnx方法不支持添加过期时间,为了保证操作的原子性,使用了lua脚本的方式
  • 自带的lpush也就是上面ListPushAsync方法中封装的操作,自带的也是没办法给定过期时间的,为了保证操作的原子性,我在这里也是用lua进行封装
  • 关于执行lua脚本的时候的时候需要注意lua脚本的格式EVAL script numkeys [key [key ...]] [arg [arg ...]]脚本后面紧跟着的长度是key的个数这个需要注意
  • 最后,自行编写命令的时候需要注意\r\n的处理和引号的转义问题,当然研究的越深,遇到的问题越多
相信大家也看到了,这里我封装的都是几个简单的操作,难度系数不大,因为主要是向大家演示Redis客户端的发送和接收操作是什么样的,甚至我都是直接返回的字符串,真实使用的时候我们使用都是需要封装序列化和反序列化操作的。
完整代码

上面分别对RedisClient类中的方法进行了讲解,接下来我把我封装的类完整的给大家贴出来,由于封装的只是几个简单的方法用于演示,所以也只有一个类,代码量也不多,主要是为了方便大家理解,有想试验的同学可以直接拿走
  1. public class RedisClient : IDisposable, IAsyncDisposable
  2. {
  3.     private readonly int DefaultPort = 6379;
  4.     private readonly string Host = "localhost";
  5.     private readonly int HeartbeatInterval = 30000;
  6.     private bool _isConnected;
  7.     private Timer _heartbeatTimer;
  8.     private Socket _socket;
  9.     public RedisClient(string host = "localhost", int defaultPort = 6379)
  10.     {
  11.         Host = host;
  12.         DefaultPort = defaultPort;
  13.         _heartbeatTimer = new Timer(HeartbeatCallback, null, HeartbeatInterval, HeartbeatInterval);
  14.     }
  15.     public async Task ConnectAsync(int timeoutMilliseconds = 5000)
  16.     {
  17.         _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  18.         var cts = new CancellationTokenSource(timeoutMilliseconds);
  19.         await _socket.ConnectAsync(Host, DefaultPort, cts.Token);
  20.         _isConnected = true;
  21.     }
  22.     public async Task SelectAsync(int dbIndex)
  23.     {
  24.         var command = $"SELECT {dbIndex}";
  25.         await SendCommandAsync(command);
  26.     }
  27.     public async Task<string> GetAsync(string key)
  28.     {
  29.         var command = $"GET {key}";
  30.         return ParseResponse(await SendCommandAsync(command));
  31.     }
  32.     public async Task<bool> SetAsync(string key, string value, TimeSpan? expiry = null)
  33.     {
  34.         var command = $"SET {key} '{value}'";
  35.         if (expiry.HasValue)
  36.         {
  37.             command += $" EX {expiry.Value.TotalSeconds}";
  38.         }
  39.         var response = ParseResponse(await SendCommandAsync(command));
  40.         return response == "OK";
  41.     }
  42.     public async Task<bool> SetNxAsync(string key, string value, TimeSpan? expiry = null)
  43.     {
  44.         var command = $"EVAL "if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then if ARGV[2] then redis.call('EXPIRE', KEYS[1], ARGV[2]) end return true else return false end" 1 {key} '{value}'";
  45.         if (expiry.HasValue)
  46.         {
  47.             command += $" {expiry.Value.TotalSeconds}";
  48.         }
  49.         var response = ParseResponse(await SendCommandAsync(command));
  50.         return response == "1";
  51.     }
  52.     public async Task<long> ListPushAsync(string key, string value, TimeSpan? expiry = null)
  53.     {
  54.         var script = @"local len = redis.call('LPUSH', KEYS[1], ARGV[1])
  55.                         if tonumber(ARGV[2]) > 0 then
  56.                             redis.call('EXPIRE', KEYS[1], ARGV[2])
  57.                         end
  58.                         return len";
  59.         var keys = new string[] { key };
  60.         var args = new string[] { value, (expiry?.TotalSeconds ?? 0).ToString() };
  61.         var response = await ExecuteLuaScriptAsync(script, keys, args);
  62.         return long.Parse(response);
  63.     }
  64.     public async Task<string> ListPopAsync(string key)
  65.     {
  66.         var command = $"LPOP {key}";
  67.         return ParseResponse(await SendCommandAsync(command));
  68.     }
  69.     public async Task<long> ListLengthAsync(string key)
  70.     {
  71.         var command = $"LLEN {key}";
  72.         return long.Parse(ParseResponse(await SendCommandAsync(command)));
  73.     }
  74.     public async Task<List<string>> ListRangeAsync(string key, int start, int end)
  75.     {
  76.         var command = $"LRANGE {key} {start} {end}";
  77.         var response = await SendCommandAsync(command);
  78.         if (response.StartsWith("*0\r\n"))
  79.         {
  80.             return new List<string>();
  81.         }
  82.         var values = new List<string>();
  83.         var pattern = @"\$\d+\r\n(.*?)\r\n";
  84.         MatchCollection matches = Regex.Matches(response, pattern);
  85.         foreach (Match match in matches)
  86.         {
  87.             values.Add(match.Groups[1].Value);
  88.         }
  89.         return values;
  90.     }
  91.     public async Task<string> ExecuteLuaScriptAsync(string script, string[]? keys = null, string[]? args = null)
  92.     {
  93.         script = Regex.Replace(script, @"[\r\n]", "");
  94.         var command = $"EVAL "{script}" { keys?.Length??0 } ";
  95.         if (keys != null && keys.Length != 0)
  96.         {
  97.             command += string.Join(" ", keys.Select(key => $"{key}"));
  98.         }
  99.         if (args != null && args.Length != 0)
  100.         {
  101.             command += " " + string.Join(" ", args.Select(arg => $"{arg}"));
  102.         }
  103.         return ParseResponse(await SendCommandAsync(command));
  104.     }
  105.     public async Task SubscribeAsync(string channel, Action<string, string> handler)
  106.     {
  107.         await SendCommandAsync($"SUBSCRIBE {channel}");
  108.         while (true)
  109.         {
  110.             var response = await SendCommandAsync(string.Empty);
  111.             string pattern = @"\*\d+\r\n\$\d+\r\n(.*?)\r\n\$\d+\r\n(.*?)\r\n\$\d+\r\n(.*?)\r\n";
  112.             Match match = Regex.Match(response, pattern);
  113.             if (match.Success)
  114.             {
  115.                 string ch = match.Groups[2].Value;
  116.                 string message = match.Groups[3].Value;
  117.                 handler(ch, message);
  118.             }
  119.         }
  120.     }
  121.     public async Task PublishAsync(string channel, string message)
  122.     {
  123.         await SendCommandAsync($"PUBLISH {channel} {message}");
  124.     }
  125.     public async Task<string> SendCommandAsync(string command)
  126.     {
  127.         if (!_isConnected)
  128.         {
  129.             await ConnectAsync();
  130.         }
  131.         var request = Encoding.UTF8.GetBytes(command + "\r\n");
  132.         await _socket.SendAsync(new ArraySegment<byte>(request), SocketFlags.None);
  133.         var response = new StringBuilder();
  134.         var remainingData = string.Empty;
  135.         byte[] receiveBuffer = ArrayPool<byte>.Shared.Rent(1024);
  136.         try
  137.         {
  138.             while (true)
  139.             {
  140.                 var bytesRead = await _socket.ReceiveAsync(new ArraySegment<byte>(receiveBuffer), SocketFlags.None);
  141.                 var responseData = remainingData + Encoding.UTF8.GetString(receiveBuffer, 0, bytesRead);
  142.                 var completeResponses = ExtractCompleteResponses(ref responseData);
  143.                 foreach (var completeResponse in completeResponses)
  144.                 {
  145.                     response.Append(completeResponse);
  146.                 }
  147.                 remainingData = responseData;
  148.                 if (response.ToString().EndsWith("\r\n"))
  149.                 {
  150.                     break;
  151.                 }
  152.             }
  153.         }
  154.         finally
  155.         {
  156.             ArrayPool<byte>.Shared.Return(receiveBuffer);
  157.         }
  158.         return response.ToString();
  159.     }
  160.     private List<string> ExtractCompleteResponses(ref string data)
  161.     {
  162.         var completeResponses = new List<string>();
  163.         while (true)
  164.         {
  165.             var index = data.IndexOf("\r\n");
  166.             if (index >= 0)
  167.             {
  168.                 var completeResponse = data.Substring(0, index + 2);
  169.                 completeResponses.Add(completeResponse);
  170.                 data = data.Substring(index + 2);
  171.             }
  172.             else
  173.             {
  174.                 break;
  175.             }
  176.         }
  177.         return completeResponses;
  178.     }
  179.     private string ParseResponse(string response)
  180.     {
  181.         if (response.StartsWith("$"))
  182.         {
  183.             var lengthStr = response.Substring(1, response.IndexOf('\r') - 1);
  184.             if (int.TryParse(lengthStr, out int length))
  185.             {
  186.                 if (length == -1)
  187.                 {
  188.                     return null!;
  189.                 }
  190.                 string rawRedisData = response.Substring(response.IndexOf('\n') + 1);
  191.                 byte[] utf8Bytes = Encoding.UTF8.GetBytes(rawRedisData);
  192.                 string value = Encoding.UTF8.GetString(utf8Bytes, 0, length);
  193.                 return value;
  194.             }
  195.         }
  196.         else if (response.StartsWith("+"))
  197.         {
  198.             return response.Substring(1, response.Length - 3);
  199.         }
  200.         else if (response.StartsWith(":"))
  201.         {
  202.             var valueStr = response.Substring(1, response.IndexOf('\r') - 1);
  203.             if (int.TryParse(valueStr, out int value))
  204.             {
  205.                 return value.ToString();
  206.             }
  207.         }
  208.         throw new InvalidOperationException(response);
  209.     }
  210.     private async void HeartbeatCallback(object state)
  211.     {
  212.         if (_isConnected)
  213.         {
  214.             var pingCommand = "PING\r\n";
  215.             await SendCommandAsync(pingCommand);
  216.         }
  217.     }
  218.     public void Dispose()
  219.     {
  220.         _heartbeatTimer.Dispose();
  221.         if (_socket != null)
  222.         {
  223.             _socket.Shutdown(SocketShutdown.Both);
  224.             _socket.Close();
  225.         }
  226.     }
  227.     public ValueTask DisposeAsync()
  228.     {
  229.         Dispose();
  230.         return ValueTask.CompletedTask;
  231.     }
  232. }
复制代码
简单使用RedisClient

上面我们封装了RedisClient类,也讲解了里面实现的几个简单的方法,接下来我们就简单的使用一下它,比较简单直接上代码
GET/SET

GET/SET是最基础和最简单的指令,没啥可说的直接上代码
  1. using RedisClient redisClient = new RedisClient();
  2. await redisClient.ConnectAsync();
  3. //切换db
  4. await redisClient.SelectAsync(3);
  5. bool setResult = await redisClient.SetAsync("key:foo", "are you ok,你好吗?", TimeSpan.FromSeconds(120));
  6. string getResult = await redisClient.GetAsync("key:foo");
  7. Console.WriteLine("get key:foo:" + getResult);
复制代码
SETNX

SETNX比较常用,很多时候用在做分布式锁的场景,判断资源存不存在的时候经常使用
  1. //第一次setnx返回true
  2. bool setNxResult = await redisClient.SetNxAsync("order:lock", "123_lock", TimeSpan.FromSeconds(120));
  3. Console.WriteLine("first setnx order:lock:" + setNxResult);
  4. //第一次setnx返回false
  5. setNxResult = await redisClient.SetNxAsync("order:lock", "123_lock", TimeSpan.FromSeconds(120));
  6. Console.WriteLine("second setnx aname:foo:" + setNxResult);
复制代码
PUB/SUB

这里实现的SubscribeAsync和PublishAsync需要使用两个RedisClient实例,因为我上面封装的每个RedisClient只包含一个Socket实例所以ReceiveAsync方法是阻塞的。如果同一个实例的话SubscribeAsync的时候,在使用PublishAsync方法的时候会被阻塞,所以演示的时候使用了两个RedisClient实例
  1. _ = redisClient.SubscribeAsync("order_msg_ch", (ch, msg) => { Console.WriteLine($"接收消息:[{ch}]---[{msg}]"); });
  2. Thread.Sleep(2000);
  3. using RedisClient redisClient2 = new RedisClient();
  4. await redisClient2.ConnectAsync();
  5. for (int i = 0; i < 5; i++)
  6. {
  7.     await redisClient2.PublishAsync("order_msg_ch", $"发送消息{i}");
  8.     Thread.Sleep(2000);
  9. }
复制代码
ExecuteLuaScriptAsync

动态执行lua的功能还是比较强大的,在之前的项目中,我也使用类似的功能。我们是模拟抢单/完成的场景,比如业务人员需要自行抢单,每个人最多抢几单,超过阈值则抢单失败,你需要把抢到的完成了才能继续抢单,这种操作就需要借助lua进行操作
  1. //抢单的lua
  2. string takeOrderLuaScript = @"
  3.         local ordersTaken = tonumber(redis.call('GET', KEYS[1]) or '0')
  4.         if ordersTaken < tonumber(ARGV[1]) then
  5.             redis.call('INCR', KEYS[1])
  6.             return 1
  7.         else
  8.             return 0
  9.         end";
  10. //完成你手里的订单操作
  11. string completeOrderLuaScript = @"
  12.         local ordersTaken = tonumber(redis.call('GET', KEYS[1]) or '0')
  13.         if ordersTaken > 0 then
  14.             redis.call('DECR', KEYS[1])
  15.             return 1
  16.         else
  17.             return 0
  18.         end";
  19. //模拟抢单,最多抢两单
  20. string result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" });
  21. result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" });
  22. result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" });
  23. result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" });
  24. //完成订单
  25. string anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" });
  26. anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" });
  27. anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" });
  28. anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" });
复制代码
还有一个功能也是我们之前遇到的,就是使用Redis实现缓存最新的N条消息,旧的则被抛弃,实现这个功能也需要使用Redis的List结构结合lua的方式
  1. string luaScript = @"
  2.             local record_key = KEYS[1]
  3.             local max_records = tonumber(ARGV[1])
  4.             local new_record = ARGV[2]
  5.             local current_count = redis.call('LLEN', record_key)
  6.             if current_count >= max_records then
  7.                 redis.call('LPOP', record_key)
  8.             end
  9.             redis.call('RPUSH', record_key, new_record)
  10.         ";
  11. //这里限制保存最新的50条数据,旧的数据则被抛弃
  12. for (int i = 0; i < 60; i++)
  13. {
  14.     _ = await redisClient.ExecuteLuaScriptAsync(luaScript, keys: new[] { "msg:list" }, new[] { "50", i.ToString() });
  15. }
复制代码
List

LIST很多时候会把它当做分布式队列来使用,它提供的操作也比较灵活,咱们这里只是封装了几个最简单的操作,大致的效果如下所示
  1. //lis入队操作
  2. var res = await redisClient.ListPushAsync("list:2", "123", TimeSpan.FromHours(1));
  3. res = await redisClient.ListPushAsync("list:2", "1234", TimeSpan.FromHours(1));
  4. res = await redisClient.ListPushAsync("list:2", "12345", TimeSpan.FromHours(1));
  5. //list出队操作
  6. var str = await redisClient.ListPopAsync("list:2");
  7. //list长度
  8. var length = await redisClient.ListLengthAsync("list:2");
  9. //list range操作
  10. var list = await redisClient.ListRangeAsync("article:list", 0, 10);
复制代码
总结

    本文我们通过理解Redis命令和RESP协议来构建了一个简单RedisClient的实现,方便我们更容易的理解Redis客户端如何与Redis服务器进行通信,这个实现也可以作为学习和理解·Redis客户端·的一个很好的例子。当然我们的这个RedisClient这是了解和学习使用,很多场景我们并没有展示,实际的项目我们还是尽量使用开源的Redis SDK, .net中常用的有StackExchange.Redis、FreeRedis、csredis、NewLife.Redis、Service.Stack.Redis,其中我经常使用的是StackExchange.Redis和FreeRedis整体来说效果还是不错的。总结一下我们文章的主要内容

  • 首先我们讲解了Redis命令的格式
  • 其次我们讲解了Redis协议(RESP)的主要格式以及如何解析
  • 然后我们基于上面的理论简单的封装了一个RedisClient类来演示相关概念
  • 最后我们通过几个示例和我用过的两个lua来简单的演示RedisClient类的使用
    作为新时代的职场人,我乐在探究自己感兴趣的领域,对未知的事物充满好奇,并渴望深入了解。对于常用的核心技术,我不仅要求自己能够熟练运用,更追求深入理解其实现原理。面对新的技术趋势,我决不会视而不见,而是在熟悉更多常用技术栈的同时,努力深入掌握一些重要的知识。我坚信,学无止境,每一步的进步都带来无比的喜悦与成就感。


  
来源:https://www.cnblogs.com/wucy/archive/2023/11/13/csharp_socket_redis_client.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具