翼度科技»论坛 编程开发 .net 查看内容

手写MSMQ微软消息队列收发工具类

8

主题

8

帖子

24

积分

新手上路

Rank: 1

积分
24
一、MSMQ介绍
MSMQ(Microsoft Message Queuing)是微软开发的消息队列技术,支持事务,支持异步发送和接收消息。
两个重要的概念:队列和消息。队列是存放消息的容器和传输消息的通道。消息是在队列上存储和传输的数据的基本单元;这个消息在计算机上的存在形式可以是任意格式的文件;在C#程序中的消息是各种类型的Class类的实例,在程序中的消息类型可以在创建XmlMessageFormatter实例时指定。消息最大4M。
应用场景可以是异步处理,如系统短时间收到大量数据/请求,到达程序能够处理的请求数上限,可以将待处理的数据/请求全部放入队列中,程序再从队列中读取消息逐个处理。应用场景也可以是系统解耦,最常见的举例如电商平台中,订单系统要将订单数据发给支付系统时,订单系统可以将数据存入队列中,支付系统从队列中读取后处理。这时订单系统不需要调用支付系统的接口,这样订单系统和支付系统可以独立部署减少了依赖。
二、工具类封装
封装的好处:甲方早期项目中大量用到微软的技术,消息队列都是用的MSMQ。我们的多个项目都有对接MQ的需求,将消息收发的功能从多个项目中提取出来放一个独立程序中,后期维护中只需要修改一份代码。同时MQ这部分功能也从业务系统中剥离出来,更新MQ程序时不影响业务系统。封装中需要兼容的两点:

  • 功能较全,适用性强。满足多种格式的数据收发,同时兼容事务性队列和非事务性队列。
  • 配置简单。
  • 同时支持多个队列的收发处理。
     
几个核心类

  • MQHandler:抽象类,实现消息发送和备份、消息接收的主流程,抽象方法Send和Receive,这两个方法在下一层的实现类中实现具体功能。为什么要有这个抽象类?这里主要考虑项目中将来可能出现别的类型队列,但对【发送-备份-接收】的主流程来说不管什么类型的队列都不变,那么这部分功能对不同队列来说是相同的代码,因此在MQHandler实现,因队列类型不同的部分代码分别在下一层中实现。
  1.    public abstract class MQHandler
  2.     {
  3.         protected MqCfgEntity cfg = null;
  4.         //是否正在处理消息
  5.         private bool IsProcessingMessage = false;
  6.         public MQHandler(MqCfgEntity cfg)
  7.         {
  8.             this.cfg = cfg;
  9.         }
  10.         public void Start()
  11.         {
  12.             while (true)
  13.             {
  14.                 try
  15.                 {
  16.                     if (IsProcessingMessage) return;
  17.                     IsProcessingMessage = true;//正在处理消息
  18.                     if ("Send".Equals(cfg.ProcessType))
  19.                     {
  20.                         Send();
  21.                     }
  22.                     else if ("Receive".Equals(cfg.ProcessType))
  23.                     {
  24.                         Receive();
  25.                     }
  26.                     IsProcessingMessage = false; //消息处理完成
  27.                 }
  28.                 catch (Exception ex)
  29.                 {
  30.                     Log4Net.Error($"{ex.Message},{ex.StackTrace}");
  31.                     IsProcessingMessage = false; //消息处理完成
  32.                 }
  33.                 Thread.Sleep(cfg.SplitSeconds * 1000);
  34.             }
  35.         }
  36.         /// <summary>
  37.         /// 备份已发送的文件,file为待备份完整文件名
  38.         /// </summary>
  39.         protected void BackSendFile(string file)
  40.         {
  41.             if (File.Exists(file) && cfg.BackPath.IsNotNullOrEmpty())
  42.             {
  43.                 FileInfo info = new FileInfo(file);
  44.                 string backPath = cfg.BackPath;
  45.                 if (cfg.BackFormat.IsNotNullOrEmpty())
  46.                 {
  47.                     backPath = $"{backPath}/{DateTime.Now.ToString(cfg.BackFormat)}";
  48.                     if (!Directory.Exists(backPath))
  49.                     {
  50.                         Directory.CreateDirectory(backPath);
  51.                     }
  52.                 }
  53.                 string backName = $"{backPath}/{info.Name}";
  54.                 if (File.Exists(backName))
  55.                 {
  56.                     backName = $"{backPath}/{Guid.NewGuid().ToString()}_{info.Name}";
  57.                 }
  58.                 File.Move(file, backName);
  59.             }
  60.         }
  61.         private void Send()
  62.         {
  63.             var names = Directory.GetFiles(cfg.FilePath, cfg.FileFilter);
  64.             if (names != null && names.Count() > 0)
  65.             {
  66.                 var files = names.ToList().Select(f => f.ForMatPath());
  67.                 foreach (var file in files)
  68.                 {
  69.                     if (file.TryOpenFile() && Send(file))
  70.                     {
  71.                         Log4Net.Info($"{file}已发送");
  72.                         BackSendFile(file);
  73.                         Log4Net.Info($"{file}已备份");
  74.                     }
  75.                 }
  76.             }
  77.         }
  78.         public abstract bool Send(string file);
  79.         public abstract void Receive();
  80.     }
复制代码
View Code

  • MSMQHandler,继承自MQHandler,实现Send和Receive方法。这里面的XmlMessageFormatter参数用来指定消息类型。如文件byte[],文本string,xml对象XmlDocument。
  1.    public class MSMQHandler : MQHandler
  2.     {
  3.         private XmlMessageFormatter formatter = null;
  4.         //把xml当作txt读取在msmq中传输时,使用utf8编码,Unicode可能会造成部分报文数据紊乱
  5.         private Encoding encoding = Encoding.UTF8;
  6.         public MSMQHandler(MqCfgEntity cfg) : base(cfg)
  7.         {
  8.             Type type = GetMsgType(cfg.MessageType);
  9.             formatter = new XmlMessageFormatter(new Type[] { type });
  10.         }
  11.         public override void Receive()
  12.         {
  13.             MessageQueue queue = null;
  14.             try
  15.             {
  16.                 queue = new MessageQueue(cfg.Queue);
  17.                 int num = queue.GetAllMessages().Length;
  18.                 for (int i = 0; i < num; i++)
  19.                 {
  20.                     ReceiveMessage(queue);
  21.                 }
  22.             }
  23.             catch (Exception ex)
  24.             {
  25.                 Log4Net.Error($"{ex.Message},{ex.StackTrace}");
  26.             }
  27.             finally
  28.             {
  29.                 if (queue != null) queue.Dispose();
  30.             }
  31.         }
  32.         private void ReceiveMessage(MessageQueue queue)
  33.         {
  34.             System.Messaging.Message message = null;
  35.             try
  36.             {
  37.                 message = queue.Receive();
  38.                 message.Formatter = formatter;
  39.                 string toFile = $"{cfg.FilePath}/{message.Label}";
  40.                 if ("file".Equals(cfg.MessageType))
  41.                 {
  42.                     SaveMessageAsBinaryFile(message, toFile);
  43.                 }
  44.                 else if ("xml".Equals(cfg.MessageType))
  45.                 {
  46.                     var doc = (XmlDocument)message.Body;
  47.                     doc.Save(toFile);
  48.                 }
  49.                 else if ("txt".Equals(cfg.MessageType))
  50.                 {
  51.                     var txt = (string)message.Body;
  52.                     SaveMessageAsTxt(message, toFile);
  53.                 }
  54.                 Log4Net.Info($"收到消息,已保存,{toFile}");
  55.             }
  56.             catch (Exception ex)
  57.             {
  58.                 Log4Net.Error($"{ex.Message},{ex.StackTrace}");
  59.             }
  60.             finally
  61.             {
  62.                 if (message != null) message.Dispose();
  63.             }
  64.         }
  65.         private void SaveMessageAsTxt(Message message, string toFile)
  66.         {
  67.             FileStream fs = null;
  68.             try
  69.             {
  70.                 fs = new FileStream(toFile, FileMode.Create);
  71.                 string content = (string)message.Body;
  72.                 var bts = encoding.GetBytes(content);
  73.                 fs.Write(bts, 0, bts.Length);
  74.             }
  75.             catch (Exception ex)
  76.             {
  77.                 Log4Net.Error($"{ex.Message},{ex.StackTrace}");
  78.             }
  79.             finally
  80.             {
  81.                 if (fs != null) fs.Dispose();
  82.             }
  83.         }
  84.         private void SaveMessageAsBinaryFile(Message message, string toFile)
  85.         {
  86.             FileStream fs = null;
  87.             try
  88.             {
  89.                 fs = new FileStream(toFile, FileMode.Create);
  90.                 var bts = (byte[])message.Body;
  91.                 fs.Write(bts, 0, bts.Length);
  92.             }
  93.             catch (Exception ex)
  94.             {
  95.                 Log4Net.Error($"{ex.Message},{ex.StackTrace}");
  96.             }
  97.             finally
  98.             {
  99.                 if (fs != null) fs.Dispose();
  100.             }
  101.         }
  102.         public override bool Send(string file)
  103.         {
  104.             bool success = true;
  105.             FileInfo fileInfo = new FileInfo(file);
  106.             MessageQueue myQueue = null;
  107.             try
  108.             {
  109.                 myQueue = new MessageQueue(cfg.Queue);
  110.                 object body = null;
  111.                 if ("file".Equals(cfg.MessageType))
  112.                 {
  113.                     FileStream fs = null;
  114.                     try
  115.                     {
  116.                         fs = new FileStream(file, FileMode.Open);
  117.                         byte[] bts = new byte[fs.Length];
  118.                         fs.Read(bts, 0, bts.Length);
  119.                         body = bts;
  120.                     }
  121.                     catch (Exception ex)
  122.                     {
  123.                         Log4Net.Error($"{ex.Message},{ex.StackTrace}");
  124.                     }
  125.                     finally
  126.                     {
  127.                         if (fs != null) fs.Dispose();
  128.                     }
  129.                 }
  130.                 else if ("xml".Equals(cfg.MessageType))
  131.                 {
  132.                     XmlDocument doc = new XmlDocument();
  133.                     doc.Load(file);
  134.                     body = doc;
  135.                 }
  136.                 else if ("txt".Equals(cfg.MessageType))
  137.                 {
  138.                     FileStream fs = null;
  139.                     try
  140.                     {
  141.                         fs = new FileStream(file, FileMode.Open);
  142.                         byte[] bts = new byte[fs.Length];
  143.                         fs.Read(bts, 0, bts.Length);
  144.                         string content = encoding.GetString(bts);
  145.                         body = content;
  146.                     }
  147.                     catch (Exception ex)
  148.                     {
  149.                         Log4Net.Error($"{ex.Message},{ex.StackTrace}");
  150.                     }
  151.                     finally
  152.                     {
  153.                         if (fs != null) fs.Dispose();
  154.                     }
  155.                 }
  156.                 Push(fileInfo.Name, myQueue, body);
  157.             }
  158.             catch (Exception ex)
  159.             {
  160.                 success = false;
  161.                 Log4Net.Error($"{ex.Message},{ex.StackTrace}");
  162.             }
  163.             finally
  164.             {
  165.                 if (myQueue != null) myQueue.Dispose();
  166.             }
  167.             return success;
  168.         }
  169.         //往队列上推送消息
  170.         private void Push(string fileName, MessageQueue myQueue, object body)
  171.         {
  172.             System.Messaging.Message message = null;
  173.             try
  174.             {
  175.                 message = new System.Messaging.Message(body);
  176.                 message.Formatter = formatter;
  177.                 message.Label = fileName;
  178.                 if (cfg.IsTransQueue)
  179.                 {
  180.                     using (MessageQueueTransaction trans = new MessageQueueTransaction())
  181.                     {
  182.                         trans.Begin();
  183.                         myQueue.Send(message, trans);
  184.                         trans.Commit();
  185.                     }
  186.                 }
  187.                 else
  188.                 {
  189.                     myQueue.Send(message);
  190.                 }
  191.             }
  192.             catch (Exception ex)
  193.             {
  194.                 Log4Net.Error($"{ex.Message},{ex.StackTrace}");
  195.             }
  196.             finally
  197.             {
  198.                 if (message != null) message.Dispose();
  199.             }
  200.         }
  201.         /// <summary>
  202.         /// 根据配置文件的类型,返回MQ队列上的消息类型
  203.         /// </summary>
  204.         private Type GetMsgType(string code)
  205.         {
  206.             Type type = null;
  207.             switch (code)
  208.             {
  209.                 case "file": type = typeof(byte[]); break;
  210.                 case "txt": type = typeof(string); break;
  211.                 case "xml": type = typeof(XmlDocument); break;
  212.             }
  213.             return type;
  214.         }
  215.     }
复制代码
View Code
 
配置文件说明

  • mq.xml,在exe同级目录中,根节点为Config,其中可以包含多个Msmq节点,一个Msmq节点对应一个接收或发送任务。
  • Msmq节点字段说明:

    • ProcessType:Send或Receive,表示用于发送或接收消息。
    • Queue:队列名称。
    • FilePath:待发送的文件所在目录,或接收到的文件的存放目录。
    • FileFilter:Send时才配置,表示待发送目录中哪些后缀格式的文件需要处理,如*.txt,*.xml,*.jpg,*.*。
    • SplitSeconds:每一轮任务处理完成后暂停多少秒再进入下一个轮循。
    • BackPath:Send时才配置,消息发送以后文件备份到哪个目录。
    • BackFormat:跟BackPath配合使用,BackPath是备份目录,BackPath表示备份文件在BackPath下按小时/天/月/年来分文件夹备份。可以为yyyyMM、yyyyMMdd等。
    • MessageType:消息类型,可以为file、xml、txt,表示消息以哪种类型(对应XmlMessageFormatter中的文件byte[]、文本string、xml对象XmlDocument)发送。
    • IsTransQueue:true或false,表示队列是否为事务性队列。

其它说明

  • 程序运行环境:.net framework 4.5+
  • 程序启动:直接运行MsmqClient.exe,后台进程,无前台界面。
  • 完整项目代码:关注以下公众号,后台回复"msmq"获取


来源:https://www.cnblogs.com/cy2011/p/18472959
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
来自手机

举报 回复 使用道具