骑只蚂蚁游世界 发表于 2024-10-18 12:48:16

手写MSMQ微软消息队列收发工具类

一、MSMQ介绍
MSMQ(Microsoft Message Queuing)是微软开发的消息队列技术,支持事务,支持异步发送和接收消息。
两个重要的概念:队列和消息。队列是存放消息的容器和传输消息的通道。消息是在队列上存储和传输的数据的基本单元;这个消息在计算机上的存在形式可以是任意格式的文件;在C#程序中的消息是各种类型的Class类的实例,在程序中的消息类型可以在创建XmlMessageFormatter实例时指定。消息最大4M。
应用场景可以是异步处理,如系统短时间收到大量数据/请求,到达程序能够处理的请求数上限,可以将待处理的数据/请求全部放入队列中,程序再从队列中读取消息逐个处理。应用场景也可以是系统解耦,最常见的举例如电商平台中,订单系统要将订单数据发给支付系统时,订单系统可以将数据存入队列中,支付系统从队列中读取后处理。这时订单系统不需要调用支付系统的接口,这样订单系统和支付系统可以独立部署减少了依赖。
二、工具类封装
封装的好处:甲方早期项目中大量用到微软的技术,消息队列都是用的MSMQ。我们的多个项目都有对接MQ的需求,将消息收发的功能从多个项目中提取出来放一个独立程序中,后期维护中只需要修改一份代码。同时MQ这部分功能也从业务系统中剥离出来,更新MQ程序时不影响业务系统。封装中需要兼容的两点:

[*]功能较全,适用性强。满足多种格式的数据收发,同时兼容事务性队列和非事务性队列。
[*]配置简单。
[*]同时支持多个队列的收发处理。
 
几个核心类

[*]MQHandler:抽象类,实现消息发送和备份、消息接收的主流程,抽象方法Send和Receive,这两个方法在下一层的实现类中实现具体功能。为什么要有这个抽象类?这里主要考虑项目中将来可能出现别的类型队列,但对【发送-备份-接收】的主流程来说不管什么类型的队列都不变,那么这部分功能对不同队列来说是相同的代码,因此在MQHandler实现,因队列类型不同的部分代码分别在下一层中实现。
   public abstract class MQHandler
    {
      protected MqCfgEntity cfg = null;
      //是否正在处理消息
      private bool IsProcessingMessage = false;

      public MQHandler(MqCfgEntity cfg)
      {
            this.cfg = cfg;
      }

      public void Start()
      {
            while (true)
            {
                try
                {
                  if (IsProcessingMessage) return;
                  IsProcessingMessage = true;//正在处理消息
                  if ("Send".Equals(cfg.ProcessType))
                  {
                        Send();
                  }
                  else if ("Receive".Equals(cfg.ProcessType))
                  {
                        Receive();
                  }
                  IsProcessingMessage = false; //消息处理完成
                }
                catch (Exception ex)
                {
                  Log4Net.Error($"{ex.Message},{ex.StackTrace}");
                  IsProcessingMessage = false; //消息处理完成
                }
                Thread.Sleep(cfg.SplitSeconds * 1000);
            }
      }

      /// <summary>
      /// 备份已发送的文件,file为待备份完整文件名
      /// </summary>
      protected void BackSendFile(string file)
      {
            if (File.Exists(file) && cfg.BackPath.IsNotNullOrEmpty())
            {
                FileInfo info = new FileInfo(file);
                string backPath = cfg.BackPath;
                if (cfg.BackFormat.IsNotNullOrEmpty())
                {
                  backPath = $"{backPath}/{DateTime.Now.ToString(cfg.BackFormat)}";
                  if (!Directory.Exists(backPath))
                  {
                        Directory.CreateDirectory(backPath);
                  }
                }
                string backName = $"{backPath}/{info.Name}";
                if (File.Exists(backName))
                {
                  backName = $"{backPath}/{Guid.NewGuid().ToString()}_{info.Name}";
                }
                File.Move(file, backName);
            }
      }

      private void Send()
      {
            var names = Directory.GetFiles(cfg.FilePath, cfg.FileFilter);
            if (names != null && names.Count() > 0)
            {
                var files = names.ToList().Select(f => f.ForMatPath());
                foreach (var file in files)
                {
                  if (file.TryOpenFile() && Send(file))
                  {
                        Log4Net.Info($"{file}已发送");
                        BackSendFile(file);
                        Log4Net.Info($"{file}已备份");
                  }
                }
            }
      }

      public abstract bool Send(string file);

      public abstract void Receive();
    }View Code

[*]MSMQHandler,继承自MQHandler,实现Send和Receive方法。这里面的XmlMessageFormatter参数用来指定消息类型。如文件byte[],文本string,xml对象XmlDocument。
   public class MSMQHandler : MQHandler
    {
      private XmlMessageFormatter formatter = null;
      //把xml当作txt读取在msmq中传输时,使用utf8编码,Unicode可能会造成部分报文数据紊乱
      private Encoding encoding = Encoding.UTF8;

      public MSMQHandler(MqCfgEntity cfg) : base(cfg)
      {
            Type type = GetMsgType(cfg.MessageType);
            formatter = new XmlMessageFormatter(new Type[] { type });
      }

      public override void Receive()
      {
            MessageQueue queue = null;
            try
            {
                queue = new MessageQueue(cfg.Queue);
                int num = queue.GetAllMessages().Length;
                for (int i = 0; i < num; i++)
                {
                  ReceiveMessage(queue);
                }
            }
            catch (Exception ex)
            {
                Log4Net.Error($"{ex.Message},{ex.StackTrace}");
            }
            finally
            {
                if (queue != null) queue.Dispose();
            }
      }

      private void ReceiveMessage(MessageQueue queue)
      {
            System.Messaging.Message message = null;
            try
            {
                message = queue.Receive();
                message.Formatter = formatter;
                string toFile = $"{cfg.FilePath}/{message.Label}";
                if ("file".Equals(cfg.MessageType))
                {
                  SaveMessageAsBinaryFile(message, toFile);
                }
                else if ("xml".Equals(cfg.MessageType))
                {
                  var doc = (XmlDocument)message.Body;
                  doc.Save(toFile);
                }
                else if ("txt".Equals(cfg.MessageType))
                {
                  var txt = (string)message.Body;
                  SaveMessageAsTxt(message, toFile);
                }
                Log4Net.Info($"收到消息,已保存,{toFile}");
            }
            catch (Exception ex)
            {
                Log4Net.Error($"{ex.Message},{ex.StackTrace}");
            }
            finally
            {
                if (message != null) message.Dispose();
            }
      }

      private void SaveMessageAsTxt(Message message, string toFile)
      {
            FileStream fs = null;
            try
            {
                fs = new FileStream(toFile, FileMode.Create);
                string content = (string)message.Body;
                var bts = encoding.GetBytes(content);
                fs.Write(bts, 0, bts.Length);
            }
            catch (Exception ex)
            {
                Log4Net.Error($"{ex.Message},{ex.StackTrace}");
            }
            finally
            {
                if (fs != null) fs.Dispose();
            }
      }

      private void SaveMessageAsBinaryFile(Message message, string toFile)
      {
            FileStream fs = null;
            try
            {
                fs = new FileStream(toFile, FileMode.Create);
                var bts = (byte[])message.Body;
                fs.Write(bts, 0, bts.Length);
            }
            catch (Exception ex)
            {
                Log4Net.Error($"{ex.Message},{ex.StackTrace}");
            }
            finally
            {
                if (fs != null) fs.Dispose();
            }
      }

      public override bool Send(string file)
      {
            bool success = true;
            FileInfo fileInfo = new FileInfo(file);
            MessageQueue myQueue = null;
            try
            {
                myQueue = new MessageQueue(cfg.Queue);
                object body = null;
                if ("file".Equals(cfg.MessageType))
                {
                  FileStream fs = null;
                  try
                  {
                        fs = new FileStream(file, FileMode.Open);
                        byte[] bts = new byte;
                        fs.Read(bts, 0, bts.Length);
                        body = bts;
                  }
                  catch (Exception ex)
                  {
                        Log4Net.Error($"{ex.Message},{ex.StackTrace}");
                  }
                  finally
                  {
                        if (fs != null) fs.Dispose();
                  }
                }
                else if ("xml".Equals(cfg.MessageType))
                {
                  XmlDocument doc = new XmlDocument();
                  doc.Load(file);
                  body = doc;
                }
                else if ("txt".Equals(cfg.MessageType))
                {
                  FileStream fs = null;
                  try
                  {
                        fs = new FileStream(file, FileMode.Open);
                        byte[] bts = new byte;
                        fs.Read(bts, 0, bts.Length);
                        string content = encoding.GetString(bts);
                        body = content;
                  }
                  catch (Exception ex)
                  {
                        Log4Net.Error($"{ex.Message},{ex.StackTrace}");
                  }
                  finally
                  {
                        if (fs != null) fs.Dispose();
                  }
                }
                Push(fileInfo.Name, myQueue, body);
            }
            catch (Exception ex)
            {
                success = false;
                Log4Net.Error($"{ex.Message},{ex.StackTrace}");
            }
            finally
            {
                if (myQueue != null) myQueue.Dispose();
            }
            return success;
      }

      //往队列上推送消息
      private void Push(string fileName, MessageQueue myQueue, object body)
      {
            System.Messaging.Message message = null;
            try
            {
                message = new System.Messaging.Message(body);
                message.Formatter = formatter;
                message.Label = fileName;
                if (cfg.IsTransQueue)
                {
                  using (MessageQueueTransaction trans = new MessageQueueTransaction())
                  {
                        trans.Begin();
                        myQueue.Send(message, trans);
                        trans.Commit();
                  }
                }
                else
                {
                  myQueue.Send(message);
                }
            }
            catch (Exception ex)
            {
                Log4Net.Error($"{ex.Message},{ex.StackTrace}");
            }
            finally
            {
                if (message != null) message.Dispose();
            }
      }

      /// <summary>
      /// 根据配置文件的类型,返回MQ队列上的消息类型
      /// </summary>
      private Type GetMsgType(string code)
      {
            Type type = null;
            switch (code)
            {
                case "file": type = typeof(byte[]); break;
                case "txt": type = typeof(string); break;
                case "xml": type = typeof(XmlDocument); break;
            }
            return type;
      }
    }View Code

[*]XmlNodeSerializer:实体与xml互转,用于解析自定义的配置文件。代码见公众号《手写xml序列化与反序列化工具类》
 
配置文件说明

[*]mq.xml,在exe同级目录中,根节点为Config,其中可以包含多个Msmq节点,一个Msmq节点对应一个接收或发送任务。
[*]Msmq节点字段说明:

[*]ProcessType:Send或Receive,表示用于发送或接收消息。
[*]Queue:队列名称。
[*]FilePath:待发送的文件所在目录,或接收到的文件的存放目录。
[*]FileFilter:Send时才配置,表示待发送目录中哪些后缀格式的文件需要处理,如*.txt,*.xml,*.jpg,*.*。
[*]SplitSeconds:每一轮任务处理完成后暂停多少秒再进入下一个轮循。
[*]BackPath:Send时才配置,消息发送以后文件备份到哪个目录。
[*]BackFormat:跟BackPath配合使用,BackPath是备份目录,BackPath表示备份文件在BackPath下按小时/天/月/年来分文件夹备份。可以为yyyyMM、yyyyMMdd等。
[*]MessageType:消息类型,可以为file、xml、txt,表示消息以哪种类型(对应XmlMessageFormatter中的文件byte[]、文本string、xml对象XmlDocument)发送。
[*]IsTransQueue:true或false,表示队列是否为事务性队列。

其它说明

[*]程序运行环境:.net framework 4.5+
[*]程序启动:直接运行MsmqClient.exe,后台进程,无前台界面。
[*]完整项目代码:关注以下公众号,后台回复"msmq"获取


来源:https://www.cnblogs.com/cy2011/p/18472959
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: 手写MSMQ微软消息队列收发工具类