翼度科技»论坛 云主机 LINUX 查看内容

RabbitMQ学习笔记02:Hello World!

9

主题

9

帖子

27

积分

新手上路

Rank: 1

积分
27
参考资料:RabbitMQ tutorial - "Hello world!" — RabbitMQ 
 
 
前言

RabbitMQ是一个中间人,它接受和转发消息。我们可以把它想象成一个邮局:当你把邮件投入邮箱的时候,你可以确信它最终会被投递到收件人的手中。RabbitMQ就是那个邮箱、邮局和邮差。区别就在于RabbitMQ投递的是二进制的消息数据。
这里有一些术语需要说明。
发送、产生消息的程序我们称之为生产者producer,使用此图标表示

队列queue就是一个有名字的邮箱,虽然消息会在RabbitMQ和你的应用程序之间流动,但是它们可以持久保存在队列中。队列会收到主机的内存和磁盘容量的限制。多个生产者可以向一个队列发送消息,同时多个消费者consumer也可以从一个队列中消费/接收消息。
队列的图标如下

接收、消费消息的程序我们称之为消费者consumer,使用此图标表示

注意,生产者、消费者以及RabbitMQ不需要位于同一台服务器上。事实上,绝大多数情况中,它们都分别位于不同的服务器上。生产者和消费者可以是同一个程序实现。
这里我们的实验环境,生产者和消费者是独立的程序文件,但是它们以及RabbitMQ都位于同一台服务器上。
 
 
Hello World!

学习RabbitMQ,其中的生产者和消费者需要用户自己使用编程语言来实现。我本人是一名运维工程师,有过bash, awk的经验,2017年学习廖雪峰的Python学了一半,近期又学习了B站高淇老师的Java前三章(包含面向对象),对于Python和Java语言算是一知半解,大概看得懂又写不出来的水平。我自己尝试了下,即使没有编程基础,只要我们严格按照官方的教程指导,也是可以将这些代码实现的。
这部分我们使用python手写生产者和消费者,使用RabbitMQ自带的guest账户。生产者会发送一条简单的消息给到名为hello的队列,消费者从队列会收到这条消息并将其打印出来。
简单的流程图如图所示,队列就有点类似于代表了消费者去接收了这条消息。

我们的操作系统自带了python 3.9,无论是键入python又或者是python3,都是指向python3,这点可以从字符链接里面看出来。
  1. [root@rabbitmq-01 ~]# ls -l $(which python)
  2. lrwxrwxrwx. 1 root root 9 Dec 12 20:51 /usr/bin/python -> ./python3
  3. [root@rabbitmq-01 ~]# ls -l $(which python3)
  4. lrwxrwxrwx. 1 root root 9 Dec 12 20:42 /usr/bin/python3 -> python3.9
复制代码
我们需要使用到 Pika 库来使得我们的python代码可以连接RabbitMQ,这个库也是官方推荐的。
  1. python -m pip install pika --upgrade
复制代码
如果没有安装pip的话,可以使用
  1. python -m ensurepip --upgrade
复制代码
接下来就可以开始正式写python代码了。
Sending


我们的第一个程序send.py会发送一条消息到队列hello中。首先我们需要建立连接。
  1. #!/usr/bin/env python
  2. import pika
  3. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  4. channel = connection.channel()
复制代码
连接建立完毕,在发送消息之前,我们需要确保队列hello必须存在,否则我们发送出去的消息就会被丢弃。因此我们创建队列。
  1. channel.queue_declare(queue='hello')
复制代码
此时我们可以开始发送消息了,我们计划发送消息Hello World!到队列hello中去。
在mq中,消息无法直接发送到队列中去,必须要经过exchange。目前我们还不需要对其进行展开理解,大家直到这么个概念即可,后续在讲解RabbitMQ tutorial - Publish/Subscribe — RabbitMQ的时候会涉及到。
我们现在只需要将其发送给默认的exchange,使用空字符串来识别它。它是一个特殊的exchange,允许我们指定要发送消息的队列,队列使用routing_key参数来指定。
  1. channel.basic_publish(exchange='',
  2.                       routing_key='hello',
  3.                       body='Hello World!')
  4. print(" [x] Sent 'Hello World!'")
复制代码
这里的print只是用来告诉用户我们做了什么。
消息发送完毕了,在我们退出程序前,我们应该确保网络buffer已经被清空,我们的消息真的发出去了。这里通过优雅地关闭连接来实现。
  1. connection.close()
复制代码
发送到这里就结束了,如果消息没有发送成功的话,可以考虑从日志入手排查问题。
Receiving


第二个程序是消费者receive.py,它将会从队列中接收消息并将其打印出来。
首先我们需要连接到服务器上,这部分代码和生产者代码相同。
接下来,同样我们需要确保队列的存在性。使用queue_declare创建队列是幂等(idempotent)的,即使执行多次也只会创建一个队列。
  1. channel.queue_declare(queue='hello')
复制代码
之所以在这里重复创建队列,是为了

  • 我们可能不知道生产者和消费者程序,哪个会先运行,因此最好在两端都创建队列。
  • 创建队列的操作需要具备幂等性。
从队列中接收一个消息会更加复杂,它工作方式是订阅一个callback函数到队列上。每当队列中有消息出现的时候,callback函数都会被Pika库调用。在本案例中,callback函数会打印消息在屏幕上。
  1. def callback(ch, method, properties, body):
  2.     print(" [x] Received %r" % body)
复制代码
接下来我们告诉mq,这个特定的callback函数需要从队列hello接收消息。这一步需要确保队列已存在,好在我们上面的代码已经确保过了。
  1. channel.basic_consume(queue='hello',
  2.                       auto_ack=True,
  3.                       on_message_callback=callback)
复制代码
auto_ack参数将会在RabbitMQ tutorial - Work Queues — RabbitMQ中讲解。
接下来我们进入无限循环,在循环中我们等待队列中一旦出现新的消息,我们就会将其消费掉然后输出消息的内容。
直到用户输入Ctrl+C来停止程序。
  1. print(' [*] Waiting for messages. To exit press CTRL+C')
  2. channel.start_consuming()
复制代码
  1. if __name__ == '__main__':
  2.     try:
  3.         main()
  4.     except KeyboardInterrupt:
  5.         print('Interrupted')
  6.         try:
  7.             sys.exit(0)
  8.         except SystemExit:
  9.             os._exit(0)
复制代码
Putting it all together

send.py
  1. #!/usr/bin/env python
  2. import pika
  3. connection = pika.BlockingConnection(
  4.     pika.ConnectionParameters(host='localhost'))
  5. channel = connection.channel()
  6. channel.queue_declare(queue='hello')
  7. channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
  8. print(" [x] Sent 'Hello World!'")
  9. connection.close()
复制代码
receive.py
  1. #!/usr/bin/env pythonimport pika, sys, osdef main():    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))    channel = connection.channel()    channel.queue_declare(queue='hello')    def callback(ch, method, properties, body):        print(" [x] Received %r" % body.decode())    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)    print('
  2. [*] Waiting for messages. To exit press CTRL+C')    channel.start_consuming()if __name__ == '__main__':
  3.     try:
  4.         main()
  5.     except KeyboardInterrupt:
  6.         print('Interrupted')
  7.         try:
  8.             sys.exit(0)
  9.         except SystemExit:
  10.             os._exit(0)
复制代码
把python代码文件放到服务器上,准备开始测试。
首先打开第一个终端,运行消费者程序receive.py,它会占用前端一直运行下去,一旦有消息就会打印,直到来自用户的终止指令Ctrl+C
  1. [root@rabbitmq-01 code]# python receive.py
  2. [*] Waiting for messages. To exit press CTRL+C
复制代码
打开第二个终端,运行生产者程序send.py,每运行一次它都会向队列hello发送一条消息Hello World!同时输出到终端,随后它就会退出,不会占用前端资源。
  1. [root@rabbitmq-01 code]# python send.py
  2. [x] Sent 'Hello World!'
复制代码
此时我们回到消费者终端,它会按照程序中说的消费掉队列中的消息并将其输出。
  1. [root@rabbitmq-01 code]# python receive.py
  2. [*] Waiting for messages. To exit press CTRL+C [x] Received 'Hello World!'
复制代码
send.py可以反复执行。
  1. [root@rabbitmq-01 code]# python send.py
  2. [x] Sent 'Hello World!'[root@rabbitmq-01 code]# python send.py
  3. [x] Sent 'Hello World!'[root@rabbitmq-01 code]# python send.py
  4. [x] Sent 'Hello World!'[root@rabbitmq-01 code]# python send.py
  5. [x] Sent 'Hello World!'
复制代码
消费者程序都可以捕获。
  1. [root@rabbitmq-01 code]# python receive.py
  2. [*] Waiting for messages. To exit press CTRL+C [x] Received 'Hello World!' [x] Received 'Hello World!' [x] Received 'Hello World!' [x] Received 'Hello World!'
复制代码
我们可以通过rabbitmqctl list_queues来查看mq实例中当前存在的队列以及队列中的消息数量。
  1. [root@rabbitmq-01 rabbitmq_server-3.11.5]# ./sbin/rabbitmqctl list_queues
  2. Timeout: 60.0 seconds ...
  3. Listing queues for vhost / ...
  4. name        messages
  5. hello        0
复制代码
因为消息在一瞬间就被消费掉了,所以我们看到的消息数量都会是0。
想要退出的话就是在消费中终端执行Ctrl+C
  1. [x] Received 'Hello World!'
  2. [x] Received 'Hello World!'
  3. ^CInterrupted
复制代码
 
本片文章的内容就到此为止了,我们学会了RabbitMQ的基本概念(生产者、消费者和队列)。

来源:https://www.cnblogs.com/alongdidi/p/rabbitmq_tutorial_one.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具