翼度科技»论坛 编程开发 .net 查看内容

在.NET Framework中使用RocketMQ(阿里云版)实战【第二章】

4

主题

4

帖子

12

积分

新手上路

Rank: 1

积分
12
章节
第一章:https://www.cnblogs.com/kimiliucn/p/17662052.html
第二章:https://www.cnblogs.com/kimiliucn/p/17667200.html


作者:西瓜程序猿
主页传送门:https://www.cnblogs.com/kimiliucn/
上一章节主要介绍了RocketMQ基本介绍和前期准备,以及如何创建生产者。那这一章节主要介绍一下消费端的实现、如何发布消费端,以及遇到的坑怎么去解决。

四、消费端实现

4.1-创建消费者

4.1.1-创建Windows服务项目

(1)右击解决方案,然后依次点击【添加】——>【新建项目】,然后选择【 Windows 服务(.NET Framework) 】,点击下一步。
注意:Windows服务只有在.NET Framework版本中才有了,在跨平台中使用Worker Service。

(2)修改项目名称,项目名称[西瓜程序猿]写的是【RocketMQ.Consumer】,然后框架选择的是【.NET Farmework 4.8】,这个可以根据自己的需要填写和选择,然后点击【创建】。

创建好的目录如下:【Program.cs】是主程序的入口,【Service1.cs】是服务的入口,可以创建多个,然后在Prodrams.cs中配置就好了。

(3)【Service1】服务名称可以重命名修改,此处我重命名为【RocketMQConsumerService】, Program.cs文件中也相对应的也要进行修改。


(4)然后我们就可以在【RocketMQConsumerService】中写业务逻辑代码了,有很多种方式可以定位到要写的具体代码文件,先列举两种常用的。
方法一:在【program.cs】文件中,找到这个类,按键盘上的F12可以直接进入查看文件。

方法二:直接右击,然后点击【查看代码】。

业务代码写到这里面:

到这一步消费者服务就创建好了,然后就写具体的业务代码就行了。注意:服务必须至少重写 OnStart 和 OnStop 才有用。

4.1.2-项目依赖配置

(1)在使用Visual Studio(VS)开发.NET的应用程序和类库时,默认的目标平台为“Any CPU”。但是.NET SDK仅支持Windows 64-bit操作系统,所以需要自行设置。先右击【RocketMQ.Consumer】项目,然后点击【属性】。

(2)点击左侧选项的【生成】,然后将目标平台改为【x64】。

(3)将资源包【ONSClient4CPP】文件夹里面所有的文件,复制到【bin/Debug】目录下。
资源包:

项目:


4.1.3-配置日志(log4net)

(1)为了方便测试,先介绍一下如何使用log4net做日志记录,当日志启动时和停止时我们记录一下。我们在项目目录下新建一个文件夹【LogConfig】,然后再创建一个文件为【log4net.config】。

(2)【log4net.config】内容如下。
  1. <?xml version="1.0" encoding="utf-8"?>
  2. <configuration>
  3.         <configSections>
  4.                 <section name="log4net" type="log4net.Config.Log4NetConfigurationSectionHandler, log4net"/>
  5.         </configSections>
  6.         <system.web>
  7.                 <compilation debug="true" targetFramework="4.5.2" />
  8.                 <httpRuntime targetFramework="4.5.2" />
  9.         </system.web>
  10.         <log4net>
  11.                
  12.                
  13.                
  14.                 <appender name="ErrorAppender" type="log4net.Appender.RollingFileAppender">
  15.                        
  16.                         <file value="log/error/error_" />
  17.                        
  18.                         <appendToFile value="true"/>
  19.                        
  20.                         <rollingStyle value="Date"/>
  21.                        
  22.                         <datePattern value="yyyy-MM-dd'.log'"/>
  23.                        
  24.                         <staticLogFileName value="false"/>
  25.                        
  26.                         <param name="MaxSizeRollBackups" value="100"/>
  27.                        
  28.                         <maximumFileSize value="50MB" />
  29.                        
  30.                         <layout type="log4net.Layout.PatternLayout">
  31.                                
  32.                                
  33.                                
  34.                                
  35.                                
  36.                                 <conversionPattern value="%n==========
  37.                                   %n【日志级别】%-5level
  38.                                   %n【记录时间】%date
  39.                                   %n【执行时间】[%r]毫秒
  40.                                   %n【出错文件】%F
  41.                                   %n【出错行号】%L
  42.                                   %n【出错的类】%logger 属性[%property{NDC}]
  43.                                   %n【错误描述】%message
  44.                                   %n【错误详情】%newline"/>
  45.                         </layout>
  46.                         <filter type="log4net.Filter.LevelRangeFilter,log4net">
  47.                                 <levelMin value="ERROR" />
  48.                                 <levelMax value="FATAL" />
  49.                         </filter>
  50.                 </appender>
  51.                
  52.                
  53.                
  54.                 <appender name="DebugAppender" type="log4net.Appender.RollingFileAppender">
  55.                        
  56.                         <file value="log/debug/debug_" />
  57.                        
  58.                         <appendToFile value="true"/>
  59.                        
  60.                         <rollingStyle value="Date"/>
  61.                        
  62.                         <datePattern value="yyyy-MM-dd'.log'"/>
  63.                        
  64.                         <staticLogFileName value="false"/>
  65.                        
  66.                         <param name="MaxSizeRollBackups" value="100"/>
  67.                        
  68.                         <maximumFileSize value="50MB" />
  69.                        
  70.                         <layout type="log4net.Layout.PatternLayout">
  71.                                
  72.                                
  73.                                
  74.                                
  75.                                
  76.                                 <conversionPattern value="%n==========
  77.                                   %n【日志级别】%-2level
  78.                                   %n【记录时间】%date
  79.                                   %n【执行时间】[%r]毫秒
  80.                                   %n【debug文件】%F
  81.                                   %n【debug行号】%L
  82.                                   %n【debug类】%logger 属性[%property{NDC}]
  83.                                   %n【debug描述】%message"/>
  84.                         </layout>
  85.                         <filter type="log4net.Filter.LevelRangeFilter,log4net">
  86.                                 <levelMin value="DEBUG" />
  87.                                 <levelMax value="WARN" />
  88.                         </filter>
  89.                 </appender>
  90.                
  91.                
  92.                
  93.                 <appender name="INFOAppender" type="log4net.Appender.RollingFileAppender">
  94.                        
  95.                         <file value="log/info/info_" />
  96.                        
  97.                         <appendToFile value="true"/>
  98.                        
  99.                         <rollingStyle value="Date"/>
  100.                        
  101.                         <datePattern value="yyyy-MM-dd'.log'"/>
  102.                        
  103.                         <staticLogFileName value="false"/>
  104.                        
  105.                         <param name="MaxSizeRollBackups" value="100"/>
  106.                        
  107.                         <maximumFileSize value="50MB" />
  108.                        
  109.                         <layout type="log4net.Layout.PatternLayout">
  110.                                
  111.                                
  112.                                
  113.                                
  114.                                
  115.                                 <conversionPattern value="%n==========
  116.                                   %n【日志级别】%-2level
  117.                                   %n【记录时间】%date
  118.                                   %n【执行时间】[%r]毫秒
  119.                                   %n【info文件】%F
  120.                                   %n【info行号】%L
  121.                                   %n【info类】%logger 属性[%property{NDC}]
  122.                                   %n【info描述】%message"/>
  123.                         </layout>
  124.                         <filter type="log4net.Filter.LevelRangeFilter,log4net">
  125.                                 <levelMin value="INFO" />
  126.                                 <levelMax value="WARN" />
  127.                         </filter>
  128.                 </appender>
  129.                
  130.                 <root>
  131.                        
  132.                         <level value="ALL" />
  133.                         <appender-ref ref="DebugAppender" />
  134.                         <appender-ref ref="ErrorAppender" />
  135.                         <appender-ref ref="INFOAppender" />
  136.                 </root>
  137.         </log4net>
  138. </configuration>
复制代码
(3)并且右击【log4net.config】文件,点击【属性】,然后将[复制到输出目录]设置为【始终复制】。

(4)然后安装log4net。在项目目录中右击【引用】,然后点击【管理NuGet程序包】

(5)然后点击浏览,搜索【log4net】,右侧点击安装。

(6)重要:然后配置【AssemblyInfo.cs 】文件,如果不配置,是输出不了日志的。

添加到底部即可:(如果你的【log4net.config】文件路径和我的不一样,记得修改成跟自己配置路径一样的)。

代码:
  1. [assembly: log4net.Config.XmlConfigurator(ConfigFileExtension = "config", ConfigFile = "LogConfig/log4net.config", Watch = true)]
复制代码
(7)在服务启动方法【OnStart】中,配置启动log4net。

代码:
  1.          XmlConfigurator.Configure(new System.IO.FileInfo("LogConfig/log4net.config"));
复制代码
(8)然后就可以使用log4net了,首先在Windows服务中获得log4net的实例。

代码:
  1. private static readonly log4net.ILog logger = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
复制代码
4.2-配置连接信息

(1)然后右击【RocketMQ.Consumer】项目下,点击【引用】,然后将【RocketMQ.Core】项目勾选上确定。

(2)然后将前期准备的基本信息放在配置文件中。在【App.config】文件进行配置。

代码:
  1. [/code](3)然后创建一个【Config】文件夹,写一个获得【ConfigSetting】配置文件的帮助类。
  2. [align=center][/align]
  3. 代码:
  4. [code]    /// <summary>
  5.     /// 配置文件
  6.     /// </summary>
  7.     public class ConfigGeter
  8.     {
  9.         private static T TryGetValueFromConfig<T>(Func<string, T> parseFunc, Func<T> defaultTValueFunc, [CallerMemberName] string key = "", string supressKey = "")
  10.         {
  11.             try
  12.             {
  13.                 if (!string.IsNullOrWhiteSpace(supressKey))
  14.                 {
  15.                     key = supressKey;
  16.                 }
  17.                 var node = ConfigurationManager.AppSettings[key];
  18.                 return !string.IsNullOrEmpty(node) ? parseFunc(node) : defaultTValueFunc();
  19.             }
  20.             catch (Exception ex)
  21.             {
  22.                 return default(T);
  23.             }
  24.         }
  25.         #region 消息队列:RocketMQ
  26.         /// <summary>
  27.         /// 设置为云消息队列 RocketMQ 版控制台实例详情页的实例用户名。
  28.         /// </summary>
  29.         public static string ons_access_key
  30.         {
  31.             get
  32.             {
  33.                 return TryGetValueFromConfig(_ => _, () => string.Empty);
  34.             }
  35.         }
  36.         /// <summary>
  37.         /// 设置为云消息队列 RocketMQ 版控制台实例详情页的实例密码。
  38.         /// </summary>
  39.         public static string ons_secret_key
  40.         {
  41.             get
  42.             {
  43.                 return TryGetValueFromConfig(_ => _, () => string.Empty);
  44.             }
  45.         }
  46.         /// <summary>
  47.         ///  您在云消息队列 RocketMQ 版控制台创建的Topic。
  48.         /// </summary>
  49.         public static string ons_topic
  50.         {
  51.             get
  52.             {
  53.                 return TryGetValueFromConfig(_ => _, () => string.Empty);
  54.             }
  55.         }
  56.         /// <summary>
  57.         /// 设置为您在云消息队列 RocketMQ 版控制台创建的Group ID。
  58.         /// </summary>
  59.         public static string ons_groupId
  60.         {
  61.             get
  62.             {
  63.                 return TryGetValueFromConfig(_ => _, () => string.Empty);
  64.             }
  65.         }
  66.         /// <summary>
  67.         /// 设置为您从云消息队列 RocketMQ 版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
  68.         /// </summary>
  69.         public static string ons_name_srv
  70.         {
  71.             get
  72.             {
  73.                 return TryGetValueFromConfig(_ => _, () => string.Empty);
  74.             }
  75.         }
  76.         /// <summary>
  77.         /// 消息来源(生产者/消费端客户端编码)
  78.         /// </summary>
  79.         public static string ons_client_code
  80.         {
  81.             get
  82.             {
  83.                 return TryGetValueFromConfig(_ => _, () => string.Empty);
  84.             }
  85.         }
  86.         #endregion
  87.     }
复制代码
4.3-封装核心代码

(1)新建一个【ConsumerStartup】文件,这个类继承自【MessageListener】类,然后实现consume方法,这个方法主要是消费者具体要执行的任务。

代码:
  1. /// <summary>
  2.     /// 消费端启动
  3.     /// </summary>
  4.     public class ConsumerStartup : MessageListener
  5.     {
  6.         private readonly static ILog logger = LogManager.GetLogger(typeof(ConsumerStartup));
  7.         private readonly static ConsumerManager manager = new ConsumerManager();
  8.         private readonly string _consumerClientCode;
  9.         private readonly string _ons_groupId;
  10.         /// <summary>
  11.         /// 构造函数
  12.         /// </summary>
  13.         /// <param name="consumerClientCode">消费者客户端Code</param>
  14.         /// <param name="ons_groupId">消费者消费的分组</param>
  15.         public ConsumerStartup(string consumerClientCode, string ons_groupId)
  16.         {
  17.             _consumerClientCode = consumerClientCode;
  18.             _ons_groupId = ons_groupId;
  19.         }
  20.         ~ConsumerStartup()
  21.         {
  22.         }
  23.         /// <summary>
  24.         /// 消费者任务
  25.         /// </summary>
  26.         /// <param name="value"></param>
  27.         /// <param name="context"></param>
  28.         /// <returns></returns>
  29.         public override ons.Action consume(Message value, ConsumeContext context)
  30.         {
  31.             Console.WriteLine("【消费者任务】:消费者消息进来了...");
  32.             logger.Info($"【消费者任务】:消费者消息进来了...");
  33.             string topic = value.getTopic();
  34.             string business_id = value.getKey();
  35.             string message_id = value.getMsgID();
  36.             string msg_tag = value.getTag();
  37.             byte[] bytes = Encoding.Default.GetBytes(value.getBody());
  38.             string msg_body = Encoding.Default.GetString(bytes);
  39.             if (string.IsNullOrEmpty(msg_body))
  40.             {
  41.                 return ons.Action.CommitMessage;
  42.             };
  43.             string log_body = $"本次消费的消息:【消费序列:{value.getQueueOffset()}】【消息key:{business_id}】【消息ID:{message_id}】【Tag:{msg_tag}】";
  44.             Console.WriteLine(log_body);
  45.             logger.Info(log_body);
  46.             logger.Info($"【消费内容】:{msg_body}");
  47.             int status = 1;
  48.             string error_msg = "";
  49.             long sys_msg_id = 0;
  50.             QueueOnsCommonModel consumerModel = null;
  51.             try
  52.             {
  53.                 //调度到具体的消费者
  54.                 consumerModel = JsonUtility.DeserializeJSON<QueueOnsCommonModel>(msg_body);
  55.                 if (consumerModel != null)
  56.                 {
  57.                     logger.Info($"【消费者任务】:真正开始执行了(消息key:{consumerModel.MessageId})");
  58.                     if (!long.TryParse(consumerModel.MessageId, out sys_msg_id))
  59.                     {
  60.                         logger.Info("sys_msg_id 转换失败!");
  61.                     }
  62.                     manager.ExecuteConsumer(consumerModel.Tag, consumerModel.EventType, consumerModel);
  63.                     logger.Info($"【消费者任务】:执行完成了(消息key:{consumerModel.MessageId})");
  64.                 }
  65.                 else
  66.                 {
  67.                     status = 2;
  68.                     error_msg = "【调度到具体的消费者】解析消息body内容为空,无法进行消费";
  69.                     logger.Error($"【调度到具体的消费者】解析消息body内容为空,无法进行消费");
  70.                 }
  71.             }
  72.             catch (Exception ex)
  73.             {
  74.                 logger.Error($"【消费者任务】:发生异常了:{ex.Message}", ex);
  75.                 status = 2;
  76.                 error_msg = ex.Message;
  77.             }
  78.             return ons.Action.CommitMessage;
  79.         }
  80.     }
复制代码
4.4-启动消费者

在【RocketMQConsumerService.cs】文件OnStart方法中创建生产者,主要就是从配置文件中获得配置信息,然后调用【QueueOnsProducer.CreatePushConsumer】方法创建消息队列生产者,通过调用【QueueOnsProducer.SetPushConsumer】方法来设置生产者,最后通过调用【QueueOnsProducer.StartPushConsumer】方法来启动生产者。

代码:
  1. //创建消费者
  2.             string ons_access_key = ConfigSetting.ons_access_key;
  3.             string ons_secret_key = ConfigSetting.ons_secret_key;
  4.             string ons_topic = ConfigSetting.ons_topic;
  5.             string ons_groupId = ConfigSetting.ons_groupId;
  6.             string ons_name_srv = ConfigSetting.ons_name_srv;
  7.             string ons_client_code = ConfigSetting.ons_client_code;
  8.             QueueOnsProducer.CreatePushConsumer(new ONSPropertyConfigModel()
  9.             {
  10.                 AccessKey = ons_access_key,
  11.                 SecretKey = ons_secret_key,
  12.                 Topics = ons_topic,
  13.                 GroupId = ons_groupId,
  14.                 NAMESRV_ADDR = ons_name_srv,
  15.                 OnsClientCode = ons_client_code,
  16.             });
  17.             //设置消费者
  18.             QueueOnsProducer.SetPushConsumer(new ConsumerStartup(ons_client_code, ons_groupId), "*");
  19.             //启动消费者
  20.             QueueOnsProducer.StartPushConsumer();
复制代码
4.5-接收消费消息

我们如果要创建一个具体消费者去消费某一条消息,需要先创建一个类,然后实现【IConsumerMsg】接口中的【Consume】方法。需要在这个方法上面标注两个特性,也可以是一个(意味着满足一个条件即可),一个是【ConsumerTag】Tag标签,表示要消费哪个生产的Tag标签,一个是【EventType】,表示要消费哪个生产的事件类型。如果有多个不同的消费者,就按照上面的方式创建多个即可。[西瓜程序猿]这边创建一个名为【SampleConsumer】的类作为例子。

代码:
  1. /// <summary>
  2.     /// 消费者Sample
  3.     /// </summary>
  4.     [ConsumerTag(QueueTagConsts.XG_Blog_Sample_Tag)]
  5.     [EventType(QueueOnsEventType.RocketMQ_TEST)]
  6.     public class SampleConsumer :  IConsumerMsg
  7.     {
  8.         private readonly static ILog logger = LogManager.GetLogger(typeof(SampleConsumer));
  9.         public void Consume(QueueOnsCommonModel model)
  10.         {
  11.             logger.Info($"【西瓜程序猿-消费者Sample】:测试消费者进来了");
  12.             if (model != null)
  13.             {
  14.                 Console.WriteLine("tag:" + model.Tag);
  15.                 Console.WriteLine("body" + model.Body);
  16.             }
  17.             Console.WriteLine("【西瓜程序猿-消费者Sample】消费成功了!");
  18.         }
  19.     }
复制代码
五、发布消费端

然后来介绍一下如何部署消费端。之前看评论区说使用NSSM部署安装Window服务更方便,后面我也试了一下确实还挺好用,但是针对目前这个程序始终运行不起来(各位大佬如果有更好的方法和建议可以在评论区提出来哈),所以这次还是用之前的方法来介绍如何部署Windows服务了。
5.1-服务基本配置

(1)点击我们的服务【RocketMQConsumerService.cs】,然后右击点击【添加安装程序】。

(2)然后可以看到下面多出来了一个文件,就是安装程序。


(3)然后可以修改基本信息,服务组件中的【服务名称】【服务描述】等等。我们右击【serviceInstall1】点击属性,然后进行修改。


(4)然后点击【serviceProcessInstall1】右击属性,进行修改。



5.2-服务运行与发布

当我们直接按F5或者其他方式直接运行项目时,会提示:"无法从命令行或调试程序启动服务。必须首先安装 Windows服务(使用installutil.exe),然后用ServerExplorer、Windows服务管理工具或 NET START命令启动它。"。不是这样运行的,跟着下面步骤来操作运行与发布Windows服务吧。

前提注意:如果你设置的目标平台是x64,打开的目录会不一样,不然导致服务运行不起来。可以右击项目名,点击【属性】——>【生成】——>【目标平台】查看。

如果不是x64版本,复制这个地址:
C:\Windows\Microsoft.NET\Framework\v4.0.30319
如果是x64版本,复制这个地址:
C:\Windows\Microsoft.NET\Framework64\v4.0.30319
不然会报类似这种错误:在初始化安装时发生异常: System.BadImageFormatException: 未能加载文件或程序集...
(1)然后我们把上面的地址(根据自己的环境选择)添加到环境变量中。点击【控制面板】——>【系统和安全】

(2)然后点击【系统】

(3)点击【高级系统设置】

(4)点击【环境变量】

(5)在【系统变量】中找到Path,然后点击【编辑】。

(6)然后点击【新建】,然后把我们拷贝的目录复制到这里。然后点击确认即可。

(7)测试是否配置成功,输入这个命令查看一下【InstallUtil】,如果是下面这样的内容说明成功了。

(8)然后编辑解决方案和项目。

(9)以管理员身份运行cmd命令,然后安装服务。
InstallUtil 项目启动执行文件全路径
西瓜程序猿的例子:
InstallUtil D:\项目演示临时保存\MyDemoService\MyDemoService\bin\Debug\MyDemoService.exe

(10)出现这个说明安装成功了。

(11)打开服务管理器,找到要启动的服务,然后右击启动服务。

(12)如果要卸载服务,可以运行这个命令:
InstallUtil /u 项目启动执行文件全路径
西瓜程序猿的例子:
InstallUtil /u D:\项目演示临时保存\MyDemoService\MyDemoService\bin\Debug\MyDemoService.exe


5.3-常见命令

1、安装服务:InstallUtil 项目启动执行文件全路径
2、启动服务:net start 服务名
3、停止服务:net stop 服务名
4、卸载服务:InstallUtil /u 项目启动执行文件全路径

5.4-测试消费消息

(1)首先可以先看一下日志,看一下这个消费者服务是否启动成功了。

(2)然后再日志里面记录下消费的消费,在根据消息Key或者消息ID在阿里云后台查询一下这一条消息的【消息轨迹】,如果提示消费成功就说明确实已经进行消费了。

最后,还有可能会出现消息生产失败、消息消费失败等场景,大佬们可以根据实际情况进行设计和跳转哈。

六、防踩坑指南

5.1:ons.ONSClient4CPPPINVOKE的类型初始值设定项引发异常

异常详情:
“ons.ONSClient4CPPPINVOKE”的类型初始值设定项引发异常。
解决方案:
第一步:在使用Visual Studio(VS)开发.NET的应用程序和类库时,默认的目标平台为“Any CPU”。但是.NET SDK仅支持Windows 64-bit操作系统,所以需要自行设置。先右击【RocketMQ.Producer】项目,然后点击【属性】,点击左侧选项的【生成】,然后将目标平台改为【x64】。

第二步:将资源包【ONSClient4CPP】文件夹里面所有的文件,复制到【bin】目录下。


5.2:Topic Route does not exist

异常详情:
Topic Route does not exist, Topic:XG_CXY_Test exception:msg: No route info of this topic, ,error:-1,in file  line:581
See https://github.com/alibaba/ons/issues/7 for further details.”

异常截图:

解决方案:
这个问题一般是没有链接上RocketMQ,检查一下配置文件中信息是否与RocketMQ信息一致。尤其是[ons_name_srv] RocketMQ 版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”切记不要加"http://或者https
来源:https://www.cnblogs.com/kimiliucn/archive/2023/08/30/17667200.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具