翼度科技»论坛 编程开发 .net 查看内容

MQTTnet 2.8 及 3.0.16 的使用

4

主题

4

帖子

12

积分

新手上路

Rank: 1

积分
12
十年河东,十年河西,莫欺少年穷
学无止境,精益求精
netcore3.1控制台应用程序,引入MQTTnet 2.8版本
订阅端:
  1. using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Text;using MQTTnet;using MQTTnet.Server; using MQTTnet.Client;using System.Threading;using System.Threading.Tasks;using System.Collections.Generic;using MQTTnet.Protocol;namespace swapConsole{    class Program    {        private static MqttClient mqttClient = null;        private static  string topic = "test123ABC";        private static IMqttClientOptions Options        {            get            {                MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder();                 builder.WithCleanSession(false);                //用户名 密码                builder.WithCredentials("", "");                var id = Guid.NewGuid().ToString();                builder.WithClientId(id);                builder.WithTcpServer("1270.0.0.0", 1883);                return builder.Build();            }        }        static async Task Main(string[] args)        {            MqttFactory factory = new MqttFactory();            if (mqttClient == null)            {                mqttClient = (MqttClient)factory.CreateMqttClient();                mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived;                mqttClient.Connected += MqttClient_Connected;                mqttClient.Disconnected += async (s, e) =>                 {                     Console.WriteLine("尝试重连!" + Environment.NewLine);                     await ConnectToServer();                 };            }            await ConnectToServer();             Console.ReadLine();        }        ///         /// 连接MQTT服务器        ///         private   static async Task ConnectToServer()        {            try            {                var res =await  mqttClient.ConnectAsync(Options);            }            catch (Exception ex)            {                Console.WriteLine($"连接到MQTT服务器失败!" + Environment.NewLine + ex.Message + Environment.NewLine);            }        }        ///         /// 连接MQTT服务器触发        ///         ///         ///         private static void MqttClient_Connected(object sender, EventArgs e)        {            Console.WriteLine("已连接到MQTT服务器!" + Environment.NewLine);            SubscribeInfo();        }        ///         /// 接收消息        ///         ///         ///         private static void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)        {            Console.WriteLine($">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}");        }        ///         /// 订阅消息        ///         public static void SubscribeInfo()        {            if (string.IsNullOrEmpty(topic))            {                Console.WriteLine("订阅主题不能为空!");                return;            }            if (!mqttClient.IsConnected)            {                Console.WriteLine("MQTT客户端尚未连接!");                return;            }            mqttClient.SubscribeAsync(new List {                new  TopicFilter(topic, MqttQualityOfServiceLevel.ExactlyOnce)            });            Console.WriteLine($"已订阅[{topic}]主题" + Environment.NewLine);        }        ///         /// 退订消息        ///         public static void UnSubscribeInfo()        {             if (string.IsNullOrEmpty(topic))            {                Console.WriteLine("退订主题不能为空!");                return;            }            if (!mqttClient.IsConnected)            {                Console.WriteLine("MQTT客户端尚未连接!");                return;            }            mqttClient.UnsubscribeAsync(topic);            Console.WriteLine($"已退订[{topic}]主题" + Environment.NewLine);        }    }}
复制代码
View Code发布端:
  1. using MQTTnet;using MQTTnet.Client;using System;using System.Text;using System.Threading;using System.Threading.Tasks;namespace swapPublish{    class Program    {        private static MqttClient mqttClient = null;        private static string topic = "test123ABC";        private static IMqttClientOptions Options        {            get            {                MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder();                builder.WithCleanSession(false);                //用户名 密码                builder.WithCredentials("", "");                var id = Guid.NewGuid().ToString();                builder.WithClientId(id);                builder.WithTcpServer("127.0.0.1", 1883);                return builder.Build();            }        }        static async Task  Main(string[] args)        {            MqttFactory factory = new MqttFactory();            if (mqttClient == null)            {                mqttClient = (MqttClient)factory.CreateMqttClient();                 mqttClient.Connected += MqttClient_Connected;                mqttClient.Disconnected += async(s, e) =>                {                    Console.WriteLine("尝试重连!" + Environment.NewLine);                    await ConnectToServer();                };            }           await  ConnectToServer();            Console.WriteLine("已断开MQTT连接!" + Environment.NewLine);            Console.ReadLine();        }        ///         /// 连接MQTT服务器        ///         private static async Task ConnectToServer()        {            try            {                var res = await mqttClient.ConnectAsync(Options);            }            catch (Exception ex)            {                Console.WriteLine($"连接到MQTT服务器失败!" + Environment.NewLine + ex.Message + Environment.NewLine);            }        }        ///         /// 连接MQTT服务器触发        ///         ///         ///         private static void MqttClient_Connected(object sender, EventArgs e)        {            Console.WriteLine("已连接到MQTT服务器!" + Environment.NewLine);            for(int i = 0; i < 10; i++)            {                var tak = PublishInfo();                 Thread.Sleep(2000);            }                  }        private static async  Task PublishInfo( )        {             if (string.IsNullOrEmpty(topic))            {               Console.WriteLine("发布主题不能为空!");                return;            }            string inputString = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");            MqttApplicationMessageBuilder builder = new MqttApplicationMessageBuilder();             builder.WithPayload(Encoding.UTF8.GetBytes(inputString));            builder.WithTopic(topic);            builder.WithRetainFlag(false);            builder.WithExactlyOnceQoS();            await mqttClient.PublishAsync(builder.Build());        }    }}
复制代码
View Code 如何只允许一个客户端消费同一个消息,暂时未解决!
大家有解决方法,请贴出评论。谢谢
MQTTnet  3.0.16 版本的使用
客户端:
  1. using MQTTnet;using MQTTnet.Adapter;using MQTTnet.Client;using MQTTnet.Client.Connecting;using MQTTnet.Client.Disconnecting;using MQTTnet.Client.Options;using MQTTnet.Client.Receiving;using MQTTnet.Protocol;using System;using System.Collections.Generic;using System.Text;using System.Threading.Tasks;namespace mqttsub{    class Program    {        static async Task Main(string[] args)        {            MqttClient mqtt = new MqttClient();            await mqtt.StartAsync();            Console.ReadKey();        }    }    public class MqttClient    {        private IMqttClient client;         private IMqttClientOptions options;        MqttClientDto model =null;        public MqttClient()        {            model = new MqttClientDto            {                Account = "",                PassWord = "",                ClientId = Guid.NewGuid().ToString(),                IP = "",                Port = 1883,                Topic="test/+/ABC" //通配符模式 该模式匹配 test/123/ABC  testABC  test/DDDDD/ABC 等            };        }        public async Task StartAsync()        {            try            {                client = new MqttFactory().CreateMqttClient();                var build = new MqttClientOptionsBuilder()                //配置客户端Id                .WithClientId(Guid.NewGuid().ToString())                //配置登录账号                .WithCredentials(model.Account,model.PassWord)                //配置服务器IP端口 这里得端口号是可空的                .WithTcpServer(model.IP, 1883)                .WithCleanSession();                options = build.Build();                //收到服务器发来消息                client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(MessageReceivedHandler);                //client.UseApplicationMessageReceivedHandler(args=> {                //    Console.WriteLine("===================================================");                //    Console.WriteLine("收到消息:");                //    Console.WriteLine($"主题:{args.ApplicationMessage.Topic}");                //    Console.WriteLine($"消息:{Encoding.UTF8.GetString(args.ApplicationMessage.Payload)}");                //    Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++");                //    Console.WriteLine();                //});                //连接成功                 client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(ConnectedHandler);                //client.UseConnectedHandler(args=> {                //    Console.WriteLine("本客户端已连接成功");                //    Console.WriteLine($"地址:{model.IP}");                //    Console.WriteLine($"端口:{model.Port}");                //    Console.WriteLine($"客户端:{model.ClientId}");                //    Console.WriteLine($"账号:{model.Account}");                //    Console.WriteLine();                //    //第1种订阅方式                //    client.SubscribeAsync("主题名称").GetAwaiter().GetResult();                //    //第2种订阅方式                //    List Topics = new List();                //    Topics.Add(new MqttTopicFilter() { Topic = "主题名称A", QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce });                //    Topics.Add(new MqttTopicFilter() { Topic = "主题名称B" });                //    Topics.Add(new MqttTopicFilter() { Topic = "主题名称C" });                //    client.SubscribeAsync(Topics.ToArray()).GetAwaiter().GetResult();                //    //第3种订阅方式                //    MqttClientSubscribeOptionsBuilder builder = new MqttClientSubscribeOptionsBuilder();                //    builder.WithTopicFilter("AAA");                //    client.SubscribeAsync(builder.Build()).GetAwaiter().GetResult();                //});                //断开连接 重连就写在此处                client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(DisconnectedHandler);                //client.UseDisconnectedHandler(args =>                //{                //    Console.WriteLine("本客户端已经断开连接");                //    Console.WriteLine();                //    try                //    {                //        client.ConnectAsync(options).GetAwaiter().GetResult();                //    }                //    catch (Exception ex)                //    {                //        Console.WriteLine("重连失败");                //    }                //});                //客户端发送消息                //await client.PublishAsync("你想要的主题", "你需要发送的东西");                //await client.PublishAsync("你想要的主题", Encoding.UTF8.GetBytes("你需要发送的东西").ToList());                //连接                await client.ConnectAsync(options);            }            catch (MqttConnectingFailedException)            {                Console.WriteLine("身份校验失败");            }            catch (Exception ex)            {                Console.WriteLine("出现异常");                Console.WriteLine(ex.Message);            }        }        ///         /// 客户端断开连接后,如果需要重连在此处实现        ///         ///         private async void DisconnectedHandler(MqttClientDisconnectedEventArgs obj)        {            Console.WriteLine("本客户端已经断开连接");            Console.WriteLine();            try            {                await client.ConnectAsync(options);            }            catch (Exception)            {                Console.WriteLine("重连失败");            }        }        ///         /// 连接成功 在此处做订阅主题(Topic)操作        ///         ///         private async void ConnectedHandler(MqttClientConnectedEventArgs obj)        {            Console.WriteLine("本客户端已连接成功");            Console.WriteLine($"地址:{model.IP}");            Console.WriteLine($"端口:{model.Port}");            Console.WriteLine($"客户端:{model.ClientId}");            Console.WriteLine($"账号:{model.Account}");            Console.WriteLine();            //第1种订阅方式            // client.SubscribeAsync("主题名称").GetAwaiter().GetResult();            //第2种订阅方式            List Topics = new List();            Topics.Add(new MqttTopicFilter() { Topic = model.Topic, QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce});            //Topics.Add(new MqttTopicFilter() { Topic = "主题名称B" });            //Topics.Add(new MqttTopicFilter() { Topic = "主题名称C" });            await client.SubscribeAsync(Topics.ToArray());            //第3种订阅方式            //MqttClientSubscribeOptionsBuilder builder = new MqttClientSubscribeOptionsBuilder();            //builder.WithTopicFilter("AAA");            //client.SubscribeAsync(builder.Build()).GetAwaiter().GetResult();        }        ///         /// 收到消息        ///         ///         private void MessageReceivedHandler(MqttApplicationMessageReceivedEventArgs obj)        {            Console.WriteLine("===================================================");            Console.WriteLine("收到消息:");            Console.WriteLine($"主题:{obj.ApplicationMessage.Topic}");            Console.WriteLine($"消息:{Encoding.UTF8.GetString(obj.ApplicationMessage.Payload)}");            Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++");            Console.WriteLine();        }    }    public class MqttClientDto    {        ///         /// 连接地址        ///         public string IP { get; set; }        ///         /// 账号        ///         public string Account { get; set; }        ///         /// 密码        ///         public string PassWord { get; set; }        ///         /// 客户端Id        ///         public string ClientId { get; set; }        public int Port { get; set; }        public string Topic { get; set; }    }}
复制代码
View Code服务端:
  1. using MQTTnet;using MQTTnet.Client.Receiving;using MQTTnet.Protocol;using MQTTnet.Server;using System;using System.Net;using System.Text;using System.Threading.Tasks;namespace MqttPub{    class Program    {        static async Task Main(string[] args)        {            await new ServerDome(). StartAsync();            Console.Read();        }    }    public class ServerDome      {        private IMqttServer server;        MqttClientDto model = null;        public ServerDome()        {            model = new MqttClientDto            {                Account = "",                PassWord = "",                ClientId = Guid.NewGuid().ToString(),                IP = "",                Port = 1883,                Topic = "test"            };        }        public async Task StartAsync()        {            if (server == null || !server.IsStarted)            {                server = new MqttFactory().CreateMqttServer();                MqttServerOptionsBuilder serverOptions = new MqttServerOptionsBuilder();                //、默认监听端口                 serverOptions.WithDefaultEndpointPort(model.Port);                //校验客户端信息                serverOptions.WithConnectionValidator(client => {                    string Account = client.Username;                    string PassWord = client.Password;                    string clientid = client.ClientId;                    if (Account == "" && PassWord == "")                    {                        client.ReasonCode = MqttConnectReasonCode.Success;                        Console.WriteLine("校验成功");                    }                    else                    {                        client.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;                        Console.WriteLine("校验失败");                    }                });                //客户端发送消息监听                server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(MessageReceivedHandler);                //server.UseApplicationMessageReceivedHandler(args=>{                //    Console.WriteLine("===================================================");                //    Console.WriteLine("收到消息:");                //    Console.WriteLine($"客户端:{args.ClientId}");                //    Console.WriteLine($"主题:{args.ApplicationMessage.Topic}");                //    Console.WriteLine($"消息:{Encoding.UTF8.GetString(args.ApplicationMessage.Payload)}");                //    Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++");                //    Console.WriteLine();                //});                //客户端连接事件                server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(ClientConnectedHandler);                //server.UseClientConnectedHandler(args =>                //{                //    Console.WriteLine($"{args.ClientId}此客户端已经连接到服务器");                //});                //客户端断开连接事件                server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(ClientDisconnectedHandler);                //server.UseClientDisconnectedHandler(args => {                //    Console.WriteLine($"断开连接的客户端:{args.ClientId}");                //    Console.WriteLine($"断开连接类型:{args.DisconnectType.ToString()}");                //});                //客户端订阅主题事件                server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(ClientSubscribedTopicHandler);                //客户端取消订阅主题事件                server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(ClientUnsubscribedTopicHandler);                //服务器启动事件                server.StartedHandler = new MqttServerStartedHandlerDelegate(StartedHandler);                //服务器停止事件                server.StoppedHandler = new MqttServerStoppedHandlerDelegate(StoppedHandler);                //服务端发送数据                //await  server.PublishAsync("你想要的主题","你需要发送的东西");                //var mqttApplicationMessage = new MqttApplicationMessage();                //mqttApplicationMessage.Topic = "你想要的主题";                //mqttApplicationMessage.Payload = Encoding.ASCII.GetBytes("你需要发送的东西");                //await server.PublishAsync(mqttApplicationMessage);                //启动服务器                await server.StartAsync(serverOptions.Build());            }        }        public async Task StopAsync()        {            if (server != null)            {                if (server.IsStarted)                {                    await server.StopAsync();                    server.Dispose();                }            }        }        ///         /// 客户端取消订阅主题        ///         ///         private void ClientUnsubscribedTopicHandler(MqttServerClientUnsubscribedTopicEventArgs obj)        {            Console.WriteLine($"客户端:{obj.ClientId}");            Console.WriteLine($"取消订阅主题:{obj.TopicFilter}");        }        ///         /// 客户端订阅的主题        ///         ///         private void ClientSubscribedTopicHandler(MqttServerClientSubscribedTopicEventArgs obj)        {            Console.WriteLine($"客户端:{obj.ClientId}");            Console.WriteLine($"订阅主题:{obj.TopicFilter.Topic}");        }        ///         /// 客户端断开连接        ///         ///         private void ClientDisconnectedHandler(MqttServerClientDisconnectedEventArgs obj)        {            Console.WriteLine($"断开连接的客户端:{obj.ClientId}");            Console.WriteLine($"断开连接类型:{obj.DisconnectType.ToString()}");        }        ///         /// 客户端连接到服务器事件        ///         ///         private void ClientConnectedHandler(MqttServerClientConnectedEventArgs obj)        {            throw new NotImplementedException();        }        ///         /// 收到各个客户端发送的消息        ///         ///         private void MessageReceivedHandler(MqttApplicationMessageReceivedEventArgs obj)        {            Console.WriteLine("===================================================");            Console.WriteLine("收到消息:");            Console.WriteLine($"客户端:{obj.ClientId}");            Console.WriteLine($"主题:{obj.ApplicationMessage.Topic}");            Console.WriteLine($"消息:{Encoding.UTF8.GetString(obj.ApplicationMessage.Payload)}");            Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++");            Console.WriteLine();        }        ///         /// MQTT启动服务器事件        ///         ///         private void StartedHandler(EventArgs obj)        {            Console.WriteLine($"程序已经启动!监听端口为:{model.Port}");        }        ///         /// MQTT服务器停止事件        ///         ///         private void StoppedHandler(EventArgs obj)        {            Console.WriteLine("程序已经关闭");        }    }    public class MqttClientDto    {        ///         /// 连接地址        ///         public string IP { get; set; }        ///         /// 账号        ///         public string Account { get; set; }        ///         /// 密码        ///         public string PassWord { get; set; }        ///         /// 客户端Id        ///         public string ClientId { get; set; }        public int Port { get; set; }        public string Topic { get; set; }    }}
复制代码
View Code这里说明下如何使用通配符
例如,发送 topic 主题为:test/123/ABC 或者 test/234/ABC ,消费者在订阅时,可以使用:test/+/ABC  来订阅该类消息。
通配符的作用为分组订阅、


 
 发布者发布内容为: test//status ,订阅者订阅的为:test/+/status

 
 当然,发布者也可以在 / / 之间增加内容,例如设备号:

 

  • 主题名不能使用通配符, 但是主题过滤器中可以使用通配符.因此,订阅者可以通过过滤器接合通配符订阅一类消息

 以MQTTnet  3.0.16 为例,开启自动确认,开启不保留最后一跳消息。

 

来源:https://www.cnblogs.com/chenwolong/archive/2023/03/21/17239760.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具