手写MSMQ微软消息队列收发工具类
一、MSMQ介绍MSMQ(Microsoft Message Queuing)是微软开发的消息队列技术,支持事务,支持异步发送和接收消息。
两个重要的概念:队列和消息。队列是存放消息的容器和传输消息的通道。消息是在队列上存储和传输的数据的基本单元;这个消息在计算机上的存在形式可以是任意格式的文件;在C#程序中的消息是各种类型的Class类的实例,在程序中的消息类型可以在创建XmlMessageFormatter实例时指定。消息最大4M。
应用场景可以是异步处理,如系统短时间收到大量数据/请求,到达程序能够处理的请求数上限,可以将待处理的数据/请求全部放入队列中,程序再从队列中读取消息逐个处理。应用场景也可以是系统解耦,最常见的举例如电商平台中,订单系统要将订单数据发给支付系统时,订单系统可以将数据存入队列中,支付系统从队列中读取后处理。这时订单系统不需要调用支付系统的接口,这样订单系统和支付系统可以独立部署减少了依赖。
二、工具类封装
封装的好处:甲方早期项目中大量用到微软的技术,消息队列都是用的MSMQ。我们的多个项目都有对接MQ的需求,将消息收发的功能从多个项目中提取出来放一个独立程序中,后期维护中只需要修改一份代码。同时MQ这部分功能也从业务系统中剥离出来,更新MQ程序时不影响业务系统。封装中需要兼容的两点:
[*]功能较全,适用性强。满足多种格式的数据收发,同时兼容事务性队列和非事务性队列。
[*]配置简单。
[*]同时支持多个队列的收发处理。
几个核心类
[*]MQHandler:抽象类,实现消息发送和备份、消息接收的主流程,抽象方法Send和Receive,这两个方法在下一层的实现类中实现具体功能。为什么要有这个抽象类?这里主要考虑项目中将来可能出现别的类型队列,但对【发送-备份-接收】的主流程来说不管什么类型的队列都不变,那么这部分功能对不同队列来说是相同的代码,因此在MQHandler实现,因队列类型不同的部分代码分别在下一层中实现。
public abstract class MQHandler
{
protected MqCfgEntity cfg = null;
//是否正在处理消息
private bool IsProcessingMessage = false;
public MQHandler(MqCfgEntity cfg)
{
this.cfg = cfg;
}
public void Start()
{
while (true)
{
try
{
if (IsProcessingMessage) return;
IsProcessingMessage = true;//正在处理消息
if ("Send".Equals(cfg.ProcessType))
{
Send();
}
else if ("Receive".Equals(cfg.ProcessType))
{
Receive();
}
IsProcessingMessage = false; //消息处理完成
}
catch (Exception ex)
{
Log4Net.Error($"{ex.Message},{ex.StackTrace}");
IsProcessingMessage = false; //消息处理完成
}
Thread.Sleep(cfg.SplitSeconds * 1000);
}
}
/// <summary>
/// 备份已发送的文件,file为待备份完整文件名
/// </summary>
protected void BackSendFile(string file)
{
if (File.Exists(file) && cfg.BackPath.IsNotNullOrEmpty())
{
FileInfo info = new FileInfo(file);
string backPath = cfg.BackPath;
if (cfg.BackFormat.IsNotNullOrEmpty())
{
backPath = $"{backPath}/{DateTime.Now.ToString(cfg.BackFormat)}";
if (!Directory.Exists(backPath))
{
Directory.CreateDirectory(backPath);
}
}
string backName = $"{backPath}/{info.Name}";
if (File.Exists(backName))
{
backName = $"{backPath}/{Guid.NewGuid().ToString()}_{info.Name}";
}
File.Move(file, backName);
}
}
private void Send()
{
var names = Directory.GetFiles(cfg.FilePath, cfg.FileFilter);
if (names != null && names.Count() > 0)
{
var files = names.ToList().Select(f => f.ForMatPath());
foreach (var file in files)
{
if (file.TryOpenFile() && Send(file))
{
Log4Net.Info($"{file}已发送");
BackSendFile(file);
Log4Net.Info($"{file}已备份");
}
}
}
}
public abstract bool Send(string file);
public abstract void Receive();
}View Code
[*]MSMQHandler,继承自MQHandler,实现Send和Receive方法。这里面的XmlMessageFormatter参数用来指定消息类型。如文件byte[],文本string,xml对象XmlDocument。
public class MSMQHandler : MQHandler
{
private XmlMessageFormatter formatter = null;
//把xml当作txt读取在msmq中传输时,使用utf8编码,Unicode可能会造成部分报文数据紊乱
private Encoding encoding = Encoding.UTF8;
public MSMQHandler(MqCfgEntity cfg) : base(cfg)
{
Type type = GetMsgType(cfg.MessageType);
formatter = new XmlMessageFormatter(new Type[] { type });
}
public override void Receive()
{
MessageQueue queue = null;
try
{
queue = new MessageQueue(cfg.Queue);
int num = queue.GetAllMessages().Length;
for (int i = 0; i < num; i++)
{
ReceiveMessage(queue);
}
}
catch (Exception ex)
{
Log4Net.Error($"{ex.Message},{ex.StackTrace}");
}
finally
{
if (queue != null) queue.Dispose();
}
}
private void ReceiveMessage(MessageQueue queue)
{
System.Messaging.Message message = null;
try
{
message = queue.Receive();
message.Formatter = formatter;
string toFile = $"{cfg.FilePath}/{message.Label}";
if ("file".Equals(cfg.MessageType))
{
SaveMessageAsBinaryFile(message, toFile);
}
else if ("xml".Equals(cfg.MessageType))
{
var doc = (XmlDocument)message.Body;
doc.Save(toFile);
}
else if ("txt".Equals(cfg.MessageType))
{
var txt = (string)message.Body;
SaveMessageAsTxt(message, toFile);
}
Log4Net.Info($"收到消息,已保存,{toFile}");
}
catch (Exception ex)
{
Log4Net.Error($"{ex.Message},{ex.StackTrace}");
}
finally
{
if (message != null) message.Dispose();
}
}
private void SaveMessageAsTxt(Message message, string toFile)
{
FileStream fs = null;
try
{
fs = new FileStream(toFile, FileMode.Create);
string content = (string)message.Body;
var bts = encoding.GetBytes(content);
fs.Write(bts, 0, bts.Length);
}
catch (Exception ex)
{
Log4Net.Error($"{ex.Message},{ex.StackTrace}");
}
finally
{
if (fs != null) fs.Dispose();
}
}
private void SaveMessageAsBinaryFile(Message message, string toFile)
{
FileStream fs = null;
try
{
fs = new FileStream(toFile, FileMode.Create);
var bts = (byte[])message.Body;
fs.Write(bts, 0, bts.Length);
}
catch (Exception ex)
{
Log4Net.Error($"{ex.Message},{ex.StackTrace}");
}
finally
{
if (fs != null) fs.Dispose();
}
}
public override bool Send(string file)
{
bool success = true;
FileInfo fileInfo = new FileInfo(file);
MessageQueue myQueue = null;
try
{
myQueue = new MessageQueue(cfg.Queue);
object body = null;
if ("file".Equals(cfg.MessageType))
{
FileStream fs = null;
try
{
fs = new FileStream(file, FileMode.Open);
byte[] bts = new byte;
fs.Read(bts, 0, bts.Length);
body = bts;
}
catch (Exception ex)
{
Log4Net.Error($"{ex.Message},{ex.StackTrace}");
}
finally
{
if (fs != null) fs.Dispose();
}
}
else if ("xml".Equals(cfg.MessageType))
{
XmlDocument doc = new XmlDocument();
doc.Load(file);
body = doc;
}
else if ("txt".Equals(cfg.MessageType))
{
FileStream fs = null;
try
{
fs = new FileStream(file, FileMode.Open);
byte[] bts = new byte;
fs.Read(bts, 0, bts.Length);
string content = encoding.GetString(bts);
body = content;
}
catch (Exception ex)
{
Log4Net.Error($"{ex.Message},{ex.StackTrace}");
}
finally
{
if (fs != null) fs.Dispose();
}
}
Push(fileInfo.Name, myQueue, body);
}
catch (Exception ex)
{
success = false;
Log4Net.Error($"{ex.Message},{ex.StackTrace}");
}
finally
{
if (myQueue != null) myQueue.Dispose();
}
return success;
}
//往队列上推送消息
private void Push(string fileName, MessageQueue myQueue, object body)
{
System.Messaging.Message message = null;
try
{
message = new System.Messaging.Message(body);
message.Formatter = formatter;
message.Label = fileName;
if (cfg.IsTransQueue)
{
using (MessageQueueTransaction trans = new MessageQueueTransaction())
{
trans.Begin();
myQueue.Send(message, trans);
trans.Commit();
}
}
else
{
myQueue.Send(message);
}
}
catch (Exception ex)
{
Log4Net.Error($"{ex.Message},{ex.StackTrace}");
}
finally
{
if (message != null) message.Dispose();
}
}
/// <summary>
/// 根据配置文件的类型,返回MQ队列上的消息类型
/// </summary>
private Type GetMsgType(string code)
{
Type type = null;
switch (code)
{
case "file": type = typeof(byte[]); break;
case "txt": type = typeof(string); break;
case "xml": type = typeof(XmlDocument); break;
}
return type;
}
}View Code
[*]XmlNodeSerializer:实体与xml互转,用于解析自定义的配置文件。代码见公众号《手写xml序列化与反序列化工具类》
配置文件说明
[*]mq.xml,在exe同级目录中,根节点为Config,其中可以包含多个Msmq节点,一个Msmq节点对应一个接收或发送任务。
[*]Msmq节点字段说明:
[*]ProcessType:Send或Receive,表示用于发送或接收消息。
[*]Queue:队列名称。
[*]FilePath:待发送的文件所在目录,或接收到的文件的存放目录。
[*]FileFilter:Send时才配置,表示待发送目录中哪些后缀格式的文件需要处理,如*.txt,*.xml,*.jpg,*.*。
[*]SplitSeconds:每一轮任务处理完成后暂停多少秒再进入下一个轮循。
[*]BackPath:Send时才配置,消息发送以后文件备份到哪个目录。
[*]BackFormat:跟BackPath配合使用,BackPath是备份目录,BackPath表示备份文件在BackPath下按小时/天/月/年来分文件夹备份。可以为yyyyMM、yyyyMMdd等。
[*]MessageType:消息类型,可以为file、xml、txt,表示消息以哪种类型(对应XmlMessageFormatter中的文件byte[]、文本string、xml对象XmlDocument)发送。
[*]IsTransQueue:true或false,表示队列是否为事务性队列。
其它说明
[*]程序运行环境:.net framework 4.5+
[*]程序启动:直接运行MsmqClient.exe,后台进程,无前台界面。
[*]完整项目代码:关注以下公众号,后台回复"msmq"获取
来源:https://www.cnblogs.com/cy2011/p/18472959
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!
页:
[1]