翼度科技»论坛 编程开发 .net 查看内容

01_在NET中使用RabbitMQ

7

主题

7

帖子

21

积分

新手上路

Rank: 1

积分
21
1.Linux上安装Docken
  1. 服务器系统版本以及内核版本:cat /etc/redhat-release
  2. 查看服务器内核版本:uname -r
  3. 安装依赖包:yum install -y yum-utils device-mapper-persistent-data lvm2
  4. 设置阿里云镜像源:yum-config-manager --add-repo https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
  5. 安装Docker:yum install -y docker-ce
  6.     社区版(Community Edition,缩写为 CE)
  7.     企业版(Enterprise Edition,缩写为 EE)  
  8. 启动docker并设置开机自启:
  9.     启动docker命令:systemctl start docker
  10.     设置开机自启命令:systemctl enable docker
  11.     查看docker版本命令:docker version<br>
复制代码
  删除docker-ce命令:yum remove docker-ce
  删除镜像、容器、配置文件等内容
  rm -rf /var/lib/containerd
  rm -rf /var/lib/docker
  1. <br><br>----------------------------------------通过docker help命令来查看更多的命令--------------------------------<br>
复制代码
  docker search --镜像名 搜索仓库镜像
  docker pull --镜像名 拉取镜像
  docker ps 查看目前正在运行的所有容器 (-a 显示包括已经停止的容器)
  docker rmi image_id/image_name 删除镜像
  docker build 使用Dockerfile创建镜像
  docker run 运行容器
  docker exec 进入容器中执行命令 (例如:docker exec -it container_id/container_name /bin/bash)
  docker logs container_id/container_name 查看容器日志(例如:docker logs -f -t --tail 10 container_id )
  docker start container_id/container_name 启动容器
  docker restart container_id/container_name 重启容器
  docker stop container_id/container_name 停止容器
  docker rm container_id/container_name 删除容器(只能删除已停止的容器)
 
2基于Docken安装RabbitMq
  docker启动:systemctl start docker
  docker重启:ystemctl  restart docker
  docker关闭:systemctl  stop docker
  1. 查看正在运行容器:docker ps <br><br>查询Rabbitmq镜像:  docker search rabbitmq  <br>安装Rabbitmq镜像:<br>     指定版本:docker pull rabbitmq:3.7.7-management<br>     最新版本:docker pull rabbitmq<br>创建和启动容器:docker run -d --hostname myrabbitmq --name rabbitmq -p 5672:5672 -p 15673:15672 rabbitmq<br>     -d 后台运行容器;<br>     --hostname  主机名;<br>     --name 指定容器名;<br>     -p 指定服务运行的端口<br>     5672 控制台Web端口号(服务端)<br>     15672  应用访问端口(客户端)<br>     -v 映射目录或文件<br>     -e 指定环境变量(RABBITMQ_DEFAULT_VHOST:默认虚拟机名;RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码)
复制代码
  1. 进入容器内部:docker exec -it 容器id /bin/bash<br>运行:rabbitmq-plugins enable rabbitmq_management<br>重启rabbitmq:docker start rabbitmq<br>重启容器:docker restart rabbitmq<br>停止容器:docker stop rabbitmq<br><br>访问:http://ip:15672/<br>账号密码:guest/guest<br><br>其它命令:
复制代码
  列出所有用户:rabbitmqctl list_users
  添加用户:rabbitmqctl add_user username password   如:新增一个用户:rabbitmqctl add_user 名称 密码
  删除用户:rabbitmqctl delete_user username
  修改密码:rabbitmqctl change_password username newpassword
  列出用户权限:rabbitmqctl list_user_permissions username
  列出虚拟主机上的所有权限:rabbitmqctl list_permissions -p vhostpath
  设置用户权限:rabbitmqctl set_permissions -p vhostpath username “.” “.” “.*”    如:设置用户权限:rabbitmqctl set_permissions -p VHostPath User ConfP WriteP ReadP
 
3.添加用户和设置权限

 
 


 
 
4.NET中使用RabbitMQ
RabbitMq有7种模式:RabbitMQ Tutorials | RabbitMQ
安装包:RabbitMQ.Client
 
定义队列和交换机名称
  1.     /// <summary>
  2.     /// 定义队列和交换机名称
  3.     /// </summary>
  4.     public class RabbitConstant
  5.     {
  6.         public const string QUEUE_HELLO_WORLD = "helloworld.queue";
  7.         public const string QUEUE_SMS = "sms.queue";
  8.         public const string EXCHANGE_WEATHER = "weather.exchange";
  9.         public const string QUEUE_BAIDU = "baidu.queue";
  10.         public const string QUEUE_SINA = "sina.queue";
  11.         public const string EXCHANGE_WEATHER_ROUTING = "weather.routing.exchange";
  12.         public const string EXCHANGE_WEATHER_TOPIC = "weather.topic.exchange";
  13.     }
复制代码
 
 
第一种模式:Hello World
消费者:
  1. using RabbitMQ.Client;
  2. using RabbitMQ.Client.Events;
  3. public class HelloConsumer
  4.     {
  5.         public static void HelloWorldShow()
  6.         {
  7.             var factory = new ConnectionFactory();
  8.             factory.HostName = "127.0.0.1";
  9.             factory.Port = 5672;//5672是RabbitMQ默认的端口号
  10.             factory.UserName = "admin";
  11.             factory.Password = "admin";
  12.             factory.VirtualHost = "my_vhost";
  13.             using (var connection = factory.CreateConnection())
  14.             {
  15.                 using (var channel = connection.CreateModel())
  16.                 {
  17.                     /*
  18.                      * 创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
  19.                      * 第一个参数:队列名称ID
  20.                      * 第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
  21.                      * 第三个参数:是否队列私有化,false则代表所有的消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用
  22.                      * 第四个:是否自动删除,false代表连接停掉后不自动删除这个队列
  23.                      * 其他额外参数为null
  24.                      */
  25.                     channel.QueueDeclare(RabbitConstant.QUEUE_HELLO_WORLD, true, false, false, null);
  26.                     Console.ForegroundColor = ConsoleColor.Cyan;
  27.                     //事件消费者类
  28.                     EventingBasicConsumer consumers = new EventingBasicConsumer(channel);
  29.                     // 触发事件
  30.                     consumers.Received += (model, ea) =>
  31.                     {
  32.                         var body = ea.Body.ToArray();
  33.                         var message = Encoding.UTF8.GetString(body);
  34.                         // false只是确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
  35.                         channel.BasicAck(ea.DeliveryTag, false);
  36.                         Console.WriteLine($"Consumer01接收消息:{message}");
  37.                     };
  38.                     /*
  39.                      * 从MQ服务器中获取数据
  40.                      * 创建一个消息消费者
  41.                      * 第一个参数:队列名
  42.                      * 第二个参数:是否自动确认收到消息,false代表手动确认消息,这是MQ推荐的做法
  43.                      * 第三个参数:要传入的IBasicConsumer接口
  44.                      */
  45.                     channel.BasicConsume(RabbitConstant.QUEUE_HELLO_WORLD, false, consumers);
  46.                     Console.WriteLine("Press [Enter] to exit");
  47.                     Console.Read();
  48.                 }
  49.             }
  50.         }
  51.     }
复制代码
生产者:
  1. using RabbitMQ.Client;
  2. public class HelloProducer
  3.     {
  4.         public static void HelloWorldShow()
  5.         {
  6.             var factory = new ConnectionFactory();
  7.             factory.HostName = "127.0.0.1";//IP
  8.             factory.Port = 5672;//端口
  9.             factory.UserName = "admin";//用户名
  10.             factory.Password = "admin";//密码
  11.             factory.VirtualHost = "my_vhost";//虚拟主机
  12.             // 获取TCP 长连接
  13.             using (var connection = factory.CreateConnection())
  14.             {
  15.                 // 创建通信“通道”,相当于TCP中的虚拟连接
  16.                 using (var channel = connection.CreateModel())
  17.                 {
  18.                     /*
  19.                      * 创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
  20.                      * 第一个参数:队列名称ID
  21.                      * 第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
  22.                      * 第三个参数:是否队列私有化,false则代表所有的消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用
  23.                      * 第四个:是否自动删除,false代表连接停掉后不自动删除这个队列
  24.                      * 其他额外参数为null
  25.                      */
  26.                     channel.QueueDeclare(RabbitConstant.QUEUE_HELLO_WORLD, true, false, false, null);
  27.                     Console.ForegroundColor = ConsoleColor.Red;
  28.                     string message = "hello CodeMan 666";//要发送的数据
  29.                     var body = Encoding.UTF8.GetBytes(message);
  30.                     /*
  31.                      * 第一个参数:exchange:交换机,暂时用不到,在进行发布订阅时才会用到
  32.                      * 第二个参数:路由key
  33.                      * 第三个参数:额外的设置属性
  34.                      * 第四个参数:最后一个参数是要传递的消息字节数组
  35.                      */
  36.                     channel.BasicPublish("", RabbitConstant.QUEUE_HELLO_WORLD, null, body);
  37.                     Console.WriteLine($"producer消息:{message}已发送");
  38.                 }
  39.             }
  40.         }
  41.     }
复制代码
 
-------------------------------------------------------------------------------漂亮的分割线--------------------------------------------------------------------------------------------------
获取ConnectionFactory 对象
  1. /// <summary>
  2.     /// RabbitMQ连接类
  3.     /// </summary>
  4.     public class RabbitUtils
  5.     {
  6.         /// <summary>
  7.         /// 获取ConnectionFactory对象
  8.         /// </summary>
  9.         /// <returns></returns>
  10.         public static ConnectionFactory GetConnection()
  11.         {
  12.             var factory = new ConnectionFactory();
  13.             factory.HostName = "127.0.0.1";//IP地址
  14.             factory.Port = 5672;//5672是RabbitMQ默认的端口号
  15.             factory.UserName = "admin";//用户名
  16.             factory.Password = "admin";//密码
  17.             factory.VirtualHost = "my_vhost";//虚拟主机
  18.             return factory;
  19.         }
  20.     }
复制代码
 
  1.     /// <summary>
  2.     /// 发送消息内容类
  3.     /// </summary>
  4.     public class Sms
  5.     {
  6.         public string Name { get; set; }
  7.         public string Mobile { get; set; }
  8.         public string Content { get; set; }
  9.         public Sms()
  10.         {
  11.         }
  12.         public Sms(string name, string mobile, string content)
  13.         {
  14.             Name = name;
  15.             Mobile = mobile;
  16.             Content = content;
  17.         }
  18.     }
复制代码
 
 
第二种模式:Work Queues
消费者1
  1. public class SmsReceive
  2.     {
  3.         public static void Sender()
  4.         {
  5.             var connection = RabbitUtils.GetConnection().CreateConnection();
  6.             var channel = connection.CreateModel();
  7.             /*
  8.              * 创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
  9.              * 第一个参数:队列名称ID
  10.              * 第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
  11.              * 第三个参数:是否队列私有化,false则代表所有的消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用
  12.              * 第四个:是否自动删除,false代表连接停掉后不自动删除这个队列
  13.              * 其他额外参数为null
  14.              */
  15.             channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null);
  16.             // 如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
  17.             // basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
  18.             channel.BasicQos(0, 1, false);
  19.             var consumer = new EventingBasicConsumer(channel);
  20.             consumer.Received += (model, ea) =>
  21.             {
  22.                 var body = ea.Body.ToArray();
  23.                 var message = Encoding.UTF8.GetString(body);
  24.                 Thread.Sleep(30);
  25.                 Console.WriteLine($"SmsSender-发送短信成功:{message}");
  26.                 channel.BasicAck(ea.DeliveryTag, false);
  27.             };
  28.             channel.BasicConsume(RabbitConstant.QUEUE_SMS, false, consumer);
  29.             Console.WriteLine("Press [Enter] to exit");
  30.             Console.Read();
  31.         }
  32.     }
复制代码
消费者2
  1. public class SmsReceive
  2.     {
  3.         public static void Sender()
  4.         {
  5.             var connection = RabbitUtils.GetConnection().CreateConnection();
  6.             var channel = connection.CreateModel();
  7.             channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null);
  8.             // 如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
  9.             // basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
  10.             channel.BasicQos(0, 1, false);//处理完一个取一个
  11.             var consumer = new EventingBasicConsumer(channel);
  12.             consumer.Received += (model, ea) =>
  13.             {
  14.                 var body = ea.Body.ToArray();
  15.                 var message = Encoding.UTF8.GetString(body);
  16.                 Thread.Sleep(60);
  17.                 Console.WriteLine($"SmsSender-发送短信成功:{message}");
  18.                 channel.BasicAck(ea.DeliveryTag, false);
  19.             };
  20.             channel.BasicConsume(RabbitConstant.QUEUE_SMS, false, consumer);
  21.             Console.WriteLine("Press [Enter] to exit");
  22.             Console.Read();
  23.         }
  24.     }
复制代码
生产者
  1. public class SmsSender
  2.     {
  3.         public static void Sender()
  4.         {
  5.             using (var connection = RabbitUtils.GetConnection().CreateConnection())
  6.             {
  7.                 using (var channel = connection.CreateModel())
  8.                 {
  9.                     /*
  10.                      * 创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
  11.                      * 第一个参数:队列名称ID
  12.                      * 第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
  13.                      * 第三个参数:是否队列私有化,false则代表所有的消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用
  14.                      * 第四个:是否自动删除,false代表连接停掉后不自动删除这个队列
  15.                      * 其他额外参数为null
  16.                      */
  17.                     channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null);
  18.                     for (int i = 0; i < 100; i++)
  19.                     {
  20.                         Sms sms = new Sms("乘客" + i, "139000000" + i, "您的车票已预定成功");
  21.                         string jsonSms = JsonConvert.SerializeObject(sms);
  22.                         var body = Encoding.UTF8.GetBytes(jsonSms);
  23.                         /*
  24.                          * 第一个参数:exchange:交换机,暂时用不到,在进行发布订阅时才会用到
  25.                          * 第二个参数:路由key
  26.                          * 第三个参数:额外的设置属性
  27.                          * 第四个参数:最后一个参数是要传递的消息字节数组
  28.                          */
  29.                         channel.BasicPublish("", RabbitConstant.QUEUE_SMS, null, body);
  30.                         Console.WriteLine($"正在发送内容:{jsonSms}");
  31.                     }
  32.                     Console.WriteLine("发送数据成功");
  33.                 }
  34.             }
  35.         }
  36.     }
复制代码
 
第三种模式:Publish/Subscribe
消费者1
  1. public class WeatherFanout
  2.     {
  3.         public static void Weather()
  4.         {
  5.             using (var connection = RabbitUtils.GetConnection().CreateConnection())
  6.             {
  7.                 using (var channel = connection.CreateModel())
  8.                 {
  9.                     //交换机
  10.                     channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER, ExchangeType.Fanout);
  11.                     // 声明队列信息
  12.                     channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);
  13.                     /*
  14.                      * queueBind 用于将队列与交换机绑定
  15.                      * 参数1:队列名
  16.                      * 参数2:交换机名
  17.                      * 参数3:路由Key(暂时用不到)
  18.                      */
  19.                     channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");
  20.                     channel.BasicQos(0, 1, false);
  21.                     var consumer = new EventingBasicConsumer(channel);
  22.                     consumer.Received += ((model, ea) =>
  23.                     {
  24.                         var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  25.                         Console.WriteLine($"百度收到的气象信息:{message}");
  26.                         channel.BasicAck(ea.DeliveryTag, false);
  27.                     });
  28.                     channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer);
  29.                     Console.WriteLine("Press [Enter] to exit");
  30.                     Console.Read();
  31.                 }
  32.             }
  33.         }
  34.     }
复制代码
消费者2
  1. public class WeatherFanout
  2.     {
  3.         public static void Weather()
  4.         {
  5.             using (var connection = RabbitUtils.GetConnection().CreateConnection())
  6.             {
  7.                 using (var channel = connection.CreateModel())
  8.                 {
  9.                     channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER, ExchangeType.Fanout);
  10.                     // 声明队列信息
  11.                     channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null);
  12.                     /*
  13.                      * queueBind 用于将队列与交换机绑定
  14.                      * 参数1:队列名
  15.                      * 参数2:交换机名
  16.                      * 参数3:路由Key(暂时用不到)
  17.                      */
  18.                     channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");
  19.                     channel.BasicQos(0, 1, false);
  20.                     var consumer = new EventingBasicConsumer(channel);
  21.                     consumer.Received += ((model, ea) =>
  22.                     {
  23.                         var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  24.                         Console.WriteLine($"百度收到的气象信息:{message}");
  25.                         channel.BasicAck(ea.DeliveryTag, false);
  26.                     });
  27.                     channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer);
  28.                     Console.WriteLine("Press [Enter] to exit");
  29.                     Console.Read();
  30.                 }
  31.             }
  32.         }
  33.     }
复制代码
生产者
  1. public class WeatherFanout
  2.     {
  3.         public static void Weather()
  4.         {
  5.             using (var connection = RabbitUtils.GetConnection().CreateConnection())
  6.             {
  7.                 using (var channel = connection.CreateModel())
  8.                 {
  9.                     string message = "20度";
  10.                     var body = Encoding.UTF8.GetBytes(message);
  11.                     /*
  12.                      * 第一个参数:exchange:交换机,暂时用不到,在进行发布订阅时才会用到
  13.                      * 第二个参数:路由key
  14.                      * 第三个参数:额外的设置属性
  15.                      * 第四个参数:最后一个参数是要传递的消息字节数组
  16.                      */
  17.                     channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER, "", null, body);
  18.                     Console.WriteLine("天气信息发送成功!");
  19.                 }
  20.             }
  21.         }
  22.     }
复制代码
 
第三种模式:Routing
消费者1
  1. public class WeatherDirect
  2.     {
  3.         public static void Weather()
  4.         {
  5.             using (var connection = RabbitUtils.GetConnection().CreateConnection())
  6.             {
  7.                 using (var channel = connection.CreateModel())
  8.                 {
  9.                     //交换机
  10.                     channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct);
  11.                     //队列
  12.                     channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);
  13.                     /*
  14.                     * queueBind 用于将队列与交换机绑定
  15.                     * 参数1:队列名
  16.                     * 参数2:交换机名
  17.                     * 参数3:路由Key(暂时用不到)
  18.                     */
  19.                     channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20210525");
  20.                     channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20210525");
  21.                     channel.BasicQos(0, 1, false);
  22.                     var consumer = new EventingBasicConsumer(channel);
  23.                     consumer.Received += ((model, ea) =>
  24.                     {
  25.                         var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  26.                         Console.WriteLine($"百度收到的气象信息:{message}");
  27.                         channel.BasicAck(ea.DeliveryTag, false);
  28.                     });
  29.                     channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer);
  30.                     Console.WriteLine("Press [Enter] to exit");
  31.                     Console.Read();
  32.                 }
  33.             }
  34.         }
  35.     }
复制代码
消费者2
  1. public class WeatherDirect
  2.     {
  3.         public static void Weather()
  4.         {
  5.             using (var connection = RabbitUtils.GetConnection().CreateConnection())
  6.             {
  7.                 using (var channel = connection.CreateModel())
  8.                 {
  9.                     //交换机
  10.                     channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct);
  11.                     // 声明队列信息
  12.                     channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null);
  13.                     /*
  14.                      * queueBind 用于将队列与交换机绑定
  15.                      * 参数1:队列名
  16.                      * 参数2:交换机名
  17.                      * 参数3:路由Key
  18.                      */
  19.                     channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.xiangyang.20210525");
  20.                     channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20210525");
  21.                     channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20210525");
  22.                     channel.BasicQos(0, 1, false);
  23.                     var consumer = new EventingBasicConsumer(channel);
  24.                     consumer.Received += ((model, ea) =>
  25.                     {
  26.                         var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  27.                         Console.WriteLine($"新浪收到的气象信息:{message}");
  28.                         channel.BasicAck(ea.DeliveryTag, false);
  29.                     });
  30.                     channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer);
  31.                     Console.WriteLine("Press [Enter] to exit");
  32.                     Console.Read();
  33.                 }
  34.             }
  35.         }
  36.     }
复制代码
生产者
  1. public class WeatherDirect
  2.     {
  3.         public static void Weather()
  4.         {
  5.             Dictionary<string, string> area = new Dictionary<string, string>();
  6.             area.Add("china.hunan.changsha.20210525", "中国湖南长沙20210525天气数据");
  7.             area.Add("china.hubei.wuhan.20210525", "中国湖北武汉20210525天气数据");
  8.             area.Add("china.hubei.xiangyang.20210525", "中国湖北襄阳20210525天气数据");
  9.             area.Add("us.cal.lsj.20210525", "美国加州洛杉矶20210525天气数据");
  10.             using (var connection = RabbitUtils.GetConnection().CreateConnection())
  11.             {
  12.                 using (var channel = connection.CreateModel())
  13.                 {
  14.                     foreach (var item in area)
  15.                     {
  16.                         /*
  17.                          * 第一个参数:exchange:交换机,暂时用不到,在进行发布订阅时才会用到
  18.                          * 第二个参数:路由key
  19.                          * 第三个参数:额外的设置属性
  20.                          * 第四个参数:最后一个参数是要传递的消息字节数组
  21.                          */
  22.                         channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, item.Key,
  23.                             null, Encoding.UTF8.GetBytes(item.Value));
  24.                     }
  25.                     Console.WriteLine("气象信息发送成功!");
  26.                 }
  27.             }
  28.         }
  29.     }
复制代码
 
第五章模式:Topics
消费者1
  1. public class WeatherTopic
  2.     {
  3.         public static void Weather()
  4.         {
  5.             using (var connection = RabbitUtils.GetConnection().CreateConnection())
  6.             {
  7.                 using (var channel = connection.CreateModel())
  8.                 {
  9.                     //交换机
  10.                     channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic);
  11.                     // 声明队列信息
  12.                     channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);
  13.                     /*
  14.                      * queueBind 用于将队列与交换机绑定
  15.                      * 参数1:队列名
  16.                      * 参数2:交换机名
  17.                      * 参数3:路由Key(暂时用不到)
  18.                      */
  19.                     channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.#");//有关china的所有信息
  20.                     channel.BasicQos(0, 1, false);
  21.                     var consumer = new EventingBasicConsumer(channel);
  22.                     consumer.Received += ((model, ea) =>
  23.                     {
  24.                         var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  25.                         Console.WriteLine($"百度收到的气象信息:{message}");
  26.                         channel.BasicAck(ea.DeliveryTag, false);
  27.                     });
  28.                     channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer);
  29.                     Console.WriteLine("Press [Enter] to exit");
  30.                     Console.Read();
  31.                 }
  32.             }
  33.         }
  34.     }
复制代码
消费者2
  1. public class WeatherTopic
  2.     {
  3.         public static void Weather()
  4.         {
  5.             using (var connection = RabbitUtils.GetConnection().CreateConnection())
  6.             {
  7.                 using (var channel = connection.CreateModel())
  8.                 {
  9.                     //交换机
  10.                     channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic);
  11.                     // 声明队列信息
  12.                     channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null);
  13.                     /*
  14.                      * queueBind 用于将队列与交换机绑定
  15.                      * 参数1:队列名
  16.                      * 参数2:交换机名
  17.                      * 参数3:路由Key(暂时用不到)
  18.                      */
  19.                     channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.hubei.*.20210525");//有关china.hubei.的信息
  20.                     channel.BasicQos(0, 1, false);
  21.                     var consumer = new EventingBasicConsumer(channel);
  22.                     consumer.Received += ((model, ea) =>
  23.                     {
  24.                         var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  25.                         Console.WriteLine($"新浪收到的气象信息:{message}");
  26.                         channel.BasicAck(ea.DeliveryTag, false);
  27.                     });
  28.                     channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer);
  29.                     Console.WriteLine("Press [Enter] to exit");
  30.                     Console.Read();
  31.                 }
  32.             }
  33.         }
  34.     }
复制代码
生产者
  1. public class WeatherTopic
  2.     {
  3.         public static void Weather()
  4.         {
  5.             Dictionary<string, string> area = new Dictionary<string, string>();
  6.             area.Add("china.hunan.changsha.20210525", "中国湖南长沙20210525天气数据");
  7.             area.Add("china.hubei.wuhan.20210525", "中国湖北武汉20210525天气数据");
  8.             area.Add("china.hubei.xiangyang.20210525", "中国湖北襄阳20210525天气数据");
  9.             area.Add("us.cal.lsj.20210525", "美国加州洛杉矶20210525天气数据");
  10.             using (var connection = RabbitUtils.GetConnection().CreateConnection())
  11.             {
  12.                 using (var channel = connection.CreateModel())
  13.                 {
  14.                     foreach (var item in area)
  15.                     {
  16.                         channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, item.Key,
  17.                             null, Encoding.UTF8.GetBytes(item.Value));
  18.                     }
  19.                     Console.WriteLine("气象信息发送成功!");
  20.                 }
  21.             }
  22.         }
  23.     }
复制代码
 

来源:https://www.cnblogs.com/MingQiu/p/18129505
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具