|
一、MSMQ介绍
MSMQ(Microsoft Message Queuing)是微软开发的消息队列技术,支持事务,支持异步发送和接收消息。
两个重要的概念:队列和消息。队列是存放消息的容器和传输消息的通道。消息是在队列上存储和传输的数据的基本单元;这个消息在计算机上的存在形式可以是任意格式的文件;在C#程序中的消息是各种类型的Class类的实例,在程序中的消息类型可以在创建XmlMessageFormatter实例时指定。消息最大4M。
应用场景可以是异步处理,如系统短时间收到大量数据/请求,到达程序能够处理的请求数上限,可以将待处理的数据/请求全部放入队列中,程序再从队列中读取消息逐个处理。应用场景也可以是系统解耦,最常见的举例如电商平台中,订单系统要将订单数据发给支付系统时,订单系统可以将数据存入队列中,支付系统从队列中读取后处理。这时订单系统不需要调用支付系统的接口,这样订单系统和支付系统可以独立部署减少了依赖。
二、工具类封装
封装的好处:甲方早期项目中大量用到微软的技术,消息队列都是用的MSMQ。我们的多个项目都有对接MQ的需求,将消息收发的功能从多个项目中提取出来放一个独立程序中,后期维护中只需要修改一份代码。同时MQ这部分功能也从业务系统中剥离出来,更新MQ程序时不影响业务系统。封装中需要兼容的两点:
- 功能较全,适用性强。满足多种格式的数据收发,同时兼容事务性队列和非事务性队列。
- 配置简单。
- 同时支持多个队列的收发处理。
几个核心类
- MQHandler:抽象类,实现消息发送和备份、消息接收的主流程,抽象方法Send和Receive,这两个方法在下一层的实现类中实现具体功能。为什么要有这个抽象类?这里主要考虑项目中将来可能出现别的类型队列,但对【发送-备份-接收】的主流程来说不管什么类型的队列都不变,那么这部分功能对不同队列来说是相同的代码,因此在MQHandler实现,因队列类型不同的部分代码分别在下一层中实现。
- public abstract class MQHandler
- {
- protected MqCfgEntity cfg = null;
- //是否正在处理消息
- private bool IsProcessingMessage = false;
- public MQHandler(MqCfgEntity cfg)
- {
- this.cfg = cfg;
- }
- public void Start()
- {
- while (true)
- {
- try
- {
- if (IsProcessingMessage) return;
- IsProcessingMessage = true;//正在处理消息
- if ("Send".Equals(cfg.ProcessType))
- {
- Send();
- }
- else if ("Receive".Equals(cfg.ProcessType))
- {
- Receive();
- }
- IsProcessingMessage = false; //消息处理完成
- }
- catch (Exception ex)
- {
- Log4Net.Error($"{ex.Message},{ex.StackTrace}");
- IsProcessingMessage = false; //消息处理完成
- }
- Thread.Sleep(cfg.SplitSeconds * 1000);
- }
- }
- /// <summary>
- /// 备份已发送的文件,file为待备份完整文件名
- /// </summary>
- protected void BackSendFile(string file)
- {
- if (File.Exists(file) && cfg.BackPath.IsNotNullOrEmpty())
- {
- FileInfo info = new FileInfo(file);
- string backPath = cfg.BackPath;
- if (cfg.BackFormat.IsNotNullOrEmpty())
- {
- backPath = $"{backPath}/{DateTime.Now.ToString(cfg.BackFormat)}";
- if (!Directory.Exists(backPath))
- {
- Directory.CreateDirectory(backPath);
- }
- }
- string backName = $"{backPath}/{info.Name}";
- if (File.Exists(backName))
- {
- backName = $"{backPath}/{Guid.NewGuid().ToString()}_{info.Name}";
- }
- File.Move(file, backName);
- }
- }
- private void Send()
- {
- var names = Directory.GetFiles(cfg.FilePath, cfg.FileFilter);
- if (names != null && names.Count() > 0)
- {
- var files = names.ToList().Select(f => f.ForMatPath());
- foreach (var file in files)
- {
- if (file.TryOpenFile() && Send(file))
- {
- Log4Net.Info($"{file}已发送");
- BackSendFile(file);
- Log4Net.Info($"{file}已备份");
- }
- }
- }
- }
- public abstract bool Send(string file);
- public abstract void Receive();
- }
复制代码 View Code
- MSMQHandler,继承自MQHandler,实现Send和Receive方法。这里面的XmlMessageFormatter参数用来指定消息类型。如文件byte[],文本string,xml对象XmlDocument。
- public class MSMQHandler : MQHandler
- {
- private XmlMessageFormatter formatter = null;
- //把xml当作txt读取在msmq中传输时,使用utf8编码,Unicode可能会造成部分报文数据紊乱
- private Encoding encoding = Encoding.UTF8;
- public MSMQHandler(MqCfgEntity cfg) : base(cfg)
- {
- Type type = GetMsgType(cfg.MessageType);
- formatter = new XmlMessageFormatter(new Type[] { type });
- }
- public override void Receive()
- {
- MessageQueue queue = null;
- try
- {
- queue = new MessageQueue(cfg.Queue);
- int num = queue.GetAllMessages().Length;
- for (int i = 0; i < num; i++)
- {
- ReceiveMessage(queue);
- }
- }
- catch (Exception ex)
- {
- Log4Net.Error($"{ex.Message},{ex.StackTrace}");
- }
- finally
- {
- if (queue != null) queue.Dispose();
- }
- }
- private void ReceiveMessage(MessageQueue queue)
- {
- System.Messaging.Message message = null;
- try
- {
- message = queue.Receive();
- message.Formatter = formatter;
- string toFile = $"{cfg.FilePath}/{message.Label}";
- if ("file".Equals(cfg.MessageType))
- {
- SaveMessageAsBinaryFile(message, toFile);
- }
- else if ("xml".Equals(cfg.MessageType))
- {
- var doc = (XmlDocument)message.Body;
- doc.Save(toFile);
- }
- else if ("txt".Equals(cfg.MessageType))
- {
- var txt = (string)message.Body;
- SaveMessageAsTxt(message, toFile);
- }
- Log4Net.Info($"收到消息,已保存,{toFile}");
- }
- catch (Exception ex)
- {
- Log4Net.Error($"{ex.Message},{ex.StackTrace}");
- }
- finally
- {
- if (message != null) message.Dispose();
- }
- }
- private void SaveMessageAsTxt(Message message, string toFile)
- {
- FileStream fs = null;
- try
- {
- fs = new FileStream(toFile, FileMode.Create);
- string content = (string)message.Body;
- var bts = encoding.GetBytes(content);
- fs.Write(bts, 0, bts.Length);
- }
- catch (Exception ex)
- {
- Log4Net.Error($"{ex.Message},{ex.StackTrace}");
- }
- finally
- {
- if (fs != null) fs.Dispose();
- }
- }
- private void SaveMessageAsBinaryFile(Message message, string toFile)
- {
- FileStream fs = null;
- try
- {
- fs = new FileStream(toFile, FileMode.Create);
- var bts = (byte[])message.Body;
- fs.Write(bts, 0, bts.Length);
- }
- catch (Exception ex)
- {
- Log4Net.Error($"{ex.Message},{ex.StackTrace}");
- }
- finally
- {
- if (fs != null) fs.Dispose();
- }
- }
- public override bool Send(string file)
- {
- bool success = true;
- FileInfo fileInfo = new FileInfo(file);
- MessageQueue myQueue = null;
- try
- {
- myQueue = new MessageQueue(cfg.Queue);
- object body = null;
- if ("file".Equals(cfg.MessageType))
- {
- FileStream fs = null;
- try
- {
- fs = new FileStream(file, FileMode.Open);
- byte[] bts = new byte[fs.Length];
- fs.Read(bts, 0, bts.Length);
- body = bts;
- }
- catch (Exception ex)
- {
- Log4Net.Error($"{ex.Message},{ex.StackTrace}");
- }
- finally
- {
- if (fs != null) fs.Dispose();
- }
- }
- else if ("xml".Equals(cfg.MessageType))
- {
- XmlDocument doc = new XmlDocument();
- doc.Load(file);
- body = doc;
- }
- else if ("txt".Equals(cfg.MessageType))
- {
- FileStream fs = null;
- try
- {
- fs = new FileStream(file, FileMode.Open);
- byte[] bts = new byte[fs.Length];
- fs.Read(bts, 0, bts.Length);
- string content = encoding.GetString(bts);
- body = content;
- }
- catch (Exception ex)
- {
- Log4Net.Error($"{ex.Message},{ex.StackTrace}");
- }
- finally
- {
- if (fs != null) fs.Dispose();
- }
- }
- Push(fileInfo.Name, myQueue, body);
- }
- catch (Exception ex)
- {
- success = false;
- Log4Net.Error($"{ex.Message},{ex.StackTrace}");
- }
- finally
- {
- if (myQueue != null) myQueue.Dispose();
- }
- return success;
- }
- //往队列上推送消息
- private void Push(string fileName, MessageQueue myQueue, object body)
- {
- System.Messaging.Message message = null;
- try
- {
- message = new System.Messaging.Message(body);
- message.Formatter = formatter;
- message.Label = fileName;
- if (cfg.IsTransQueue)
- {
- using (MessageQueueTransaction trans = new MessageQueueTransaction())
- {
- trans.Begin();
- myQueue.Send(message, trans);
- trans.Commit();
- }
- }
- else
- {
- myQueue.Send(message);
- }
- }
- catch (Exception ex)
- {
- Log4Net.Error($"{ex.Message},{ex.StackTrace}");
- }
- finally
- {
- if (message != null) message.Dispose();
- }
- }
- /// <summary>
- /// 根据配置文件的类型,返回MQ队列上的消息类型
- /// </summary>
- private Type GetMsgType(string code)
- {
- Type type = null;
- switch (code)
- {
- case "file": type = typeof(byte[]); break;
- case "txt": type = typeof(string); break;
- case "xml": type = typeof(XmlDocument); break;
- }
- return type;
- }
- }
复制代码 View Code
配置文件说明
- mq.xml,在exe同级目录中,根节点为Config,其中可以包含多个Msmq节点,一个Msmq节点对应一个接收或发送任务。
- Msmq节点字段说明:
- ProcessType:Send或Receive,表示用于发送或接收消息。
- Queue:队列名称。
- FilePath:待发送的文件所在目录,或接收到的文件的存放目录。
- FileFilter:Send时才配置,表示待发送目录中哪些后缀格式的文件需要处理,如*.txt,*.xml,*.jpg,*.*。
- SplitSeconds:每一轮任务处理完成后暂停多少秒再进入下一个轮循。
- BackPath:Send时才配置,消息发送以后文件备份到哪个目录。
- BackFormat:跟BackPath配合使用,BackPath是备份目录,BackPath表示备份文件在BackPath下按小时/天/月/年来分文件夹备份。可以为yyyyMM、yyyyMMdd等。
- MessageType:消息类型,可以为file、xml、txt,表示消息以哪种类型(对应XmlMessageFormatter中的文件byte[]、文本string、xml对象XmlDocument)发送。
- IsTransQueue:true或false,表示队列是否为事务性队列。
其它说明
- 程序运行环境:.net framework 4.5+
- 程序启动:直接运行MsmqClient.exe,后台进程,无前台界面。
- 完整项目代码:关注以下公众号,后台回复"msmq"获取
来源:https://www.cnblogs.com/cy2011/p/18472959
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作! |
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有账号?立即注册
x
|