rabbitmq for python

  |  

因为最近在学习rabbitmq,所以这里记录一下rabbitmq的python客户端的使用。

MQ相关的概念

RabbitMQ 是一种分布式消息中间件,消息中间件也称消息队列MQ

MQ的基本概念

什么是MQ?MQ是消息队列的英文缩写,即Message
Queue,是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。

message-queue

在互联网架构中,MQ是一种非常常见的上下游”逻辑解耦+物理解耦”的消息通信服务,用于上下游传递消息。使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务

常见的MQ消息中间件有很多,例如ActiveMQ、RabbitMQ、Kafka、RocketMQ等等。

MQ能很好的解决一些复杂特殊的场景:

  1. 高并发的流量削峰

    假设某订单系统每秒最多能处理一万次订单,也就是最多承受的10000qps,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。

    traffic-peak-shaving

  2. 应用解耦

    以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。

    application-decoupling

  3. 异步处理

    有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可以执行完,以前一般有两种方式,A
    过一段时间去调用 B 的查询 api 查询。或者 A 提供一个 callback api, B 执行完之后调用 api 通知 A
    服务。这两种方式都不是很优雅,使用消息队列,可以很方便解决这个问题,A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B
    处理完成后,会发送一条消息给 MQ,MQ 会将此消息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback
    api。同样B 服务也不用做这些操作。A 服务还能及时的得到异步处理成功的消息。

    asynchronous-processing

  4. 分布式事务

    以订单服务为例,传统的方式为单体应用,支付、修改订单状态、创建物流订单三个步骤集成在一个服务中,因此这三个步骤可以放在一个事务中,要么全成功,要么全失败。而在微服务的环境下,会将三个步骤拆分成三个服务,例如:支付服务,订单服务,物流服务。三者各司其职,相互之间进行服务间调用,但这会带来分布式事务的问题,因为三个步骤操作的不是同一个数据库,导致无法使用事务管理以达到一致性。而
    MQ
    能够很好的帮我们解决分布式事务的问题,有一个比较容易理解的方案,就是二次提交。基于MQ的特点,MQ作为二次提交的中间节点,负责存储请求数据,在失败的情况可以进行多次尝试,或者基于MQ中的队列数据进行回滚操作,是一个既能保证性能,又能保证业务一致性的方案,如下图所示:

    distributed-transaction

  5. 数据分发

    MQ 具有发布订阅机制,不仅仅是简单的上游和下游一对一的关系,还有支持一对多或者广播的模式,并且都可以根据规则选择分发的对象。这样一份上游数据,众多下游系统中,可以根据规则选择是否接收这些数据,能达到很高的拓展性。

    data-distribution

常用的MQ

  1. Kafka

    大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开 Kafka,这款为大数据而生的消息中间件,以其百万级 TPS
    的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用。目前已经被 LinkedIn,Uber,
    Twitter, Netflix 等大公司所采纳。

    • 优点:性能卓越,吞吐量高,单机写入 TPS 约在百万条/秒,时效性 ms 级,可用性非常高;其次 kafka
      是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据导致服务不可用,消费者采用 Pull
      方式获取消息,消息有序,通过控制能够保证所有消息被消费且仅被消费一次。此外 kafka 有优秀的第三方 Kafka Web 管理界面
      Kafka-Manager,在日志领域比较成熟,被多家公司和多个开源项目使用;最后 kafka 在功能支持方便面它功能较为简单,主要支持简单的
      MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用。
    • 缺点:Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load
      越高,发送消息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序,但是一台代理宕机后,就会产生消息乱序,社区更新较慢;
    • 选用场景:Kafka 主要特点是基于Pull
      的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能,肯定是首选
      kafka 了。
  2. RocketMQ

    RocketMQ 出自阿里巴巴的开源产品,用 Java 语言实现,在设计时参考了
    Kafka,并做出了自己的一些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog 分发等场景。

    • 优点:单机吞吐量十万级,可用性非常高,采用分布式架构,消息可以做到 0 丢失,MQ 功能较为完善,扩展性好,支持 10
      亿级别的消息堆积,不会因为堆积导致性能下降,采用 java 语言实现。
    • 缺点:支持的客户端语言不多,目前是 java 及 c++,其中 c++不成熟;社区活跃度一般,没有在MQ核心中去实现 JMS
      等接口,有些系统要迁移需要修改大量代码。
    • 选用场景:天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ
      在稳定性上可能更值得信赖,这些业务场景在阿里双 11 已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择
      RocketMQ。
  3. RabbitMQ

    2007 年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

    • 优点:由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ
      功能比较完备、健壮、稳定、易用、跨平台、支持多种语言如Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持
      AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高。
    • 缺点:商业版需要收费,学习成本较高。
    • 选用场景:结合 erlang 语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的
      RabbitMQ。

消息队列协议

协议是在TCP/IP协议基础之上构建的种约定成的规范和机制,目的是让客户端进行沟通和通讯。并且这种协议下规范必须具有持久性高可用高可靠
的性能。

为什么不直接采用TCP/IP协议去传递消息?因为TCP/IP协议太过于简单,并不能承载消息的内容和载体,因此在此之上增加一些内容,给消息的传递分发高可用提供基础。

我们知道消息中间件负责数据的传递,存储,和分发消费三个部分,数据的存储和分发的过程中肯定要遵循某种约定成俗的规范,是采用底层的TCP/IP,UDP协议还是在这基础上自己构建等,而这些约定成俗的规范就称之为:协议。

消息中间件采用的并不是http协议,而常见的消息中间件协议有:OpenWire、AMQP、MQTT、Kafka、OpenMessage协议

面试题:为什么消息中间件不直接使用http协议呢?

因为http请求报文头和响应报文头是比较复杂的,包含了cookie、数据的加密解密、状态码、晌应码等附加的功能,但是对于个消息而言,我们并不需要这么复杂,也没有这个必要性,它其实就是负责数据传递,存储,分发就够,要追求的是高性能。尽量简洁,快速。

大部分情况下http大部分都是短链接,在实际的交互过程中,一个请求到响应很有可能会中断,中断以后就不会就行持久化,就会造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取消息的过程,出现问题和故障要对数据或消息就行持久化等,目的是为了保证消息和数据的高可靠和稳健的运行。

常用消息中间件协议

  1. AMQP协议(Advanced Message Queuing Protocol—高级消息队列协议)

    它由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。

    特性:分布式事务支、消息的持久化支持、高性能和高可靠的消息处理优势

    AMQP典型的实现者是RabbitMQ、ACTIVEMQ等

  2. MQTT协议(Message Queueing Telemetry Transport—消息队列遥测传输协议)

    它是一种基于发布/订阅(publish/subscribe)模式的”轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。

    特点:轻量、结构简单、传输快、不支持事务、没有持久化设计

    应用场景:适用于计算能力有限、低带宽、网络不稳定的场景

    支持者:RabbitMQ、ACTIVEMQ(默认情况下关闭,需要打开)

  3. OpenMessage协议

    是近几年由阿里、雅虎和滴滴出行、 Stremalio等公司共同参与创立的分布式消息中间件、流处理等领域的应用开发标准。

    特点:结构简单、解析速度快、支持事务和持久化设计

  4. Kafka协议

    基于TCP/IP的二进制协议。消息内部是通过长度来分割,由些基本数据类型组成。

    特点:结构简单、解析速度快、无事务支持、有持久化设计

消息队列持久化

持久化简单来说就是将数据存入磁盘,而不是存在内存中随服务器重启断开而消失,使数据能够永久保存。

message-queue-persistence

常见的持久化方式和对比:

ActiveMQ RabbitMQ Kafka RocketMQ
文件存储 支持 支持 支持 支持
数据库 支持 / / /

消息的分发策略

MQ消息队列有如下几个角色:

  • Producer:消息生产者。负责产生和发送消息到 Broker
  • Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue
  • Consumer:消息消费者。负责从 Broker 中获取消息,并进行相应处理

生产者产生消息后,MQ进行存储,消费者获得消息无外乎外推(push)或者拉(pull)两种方式,典型的git就有推拉机制,我们发送的http请求就是一种典型的拉取数据库数据返回的过程。而消息队列MQ是一种推送过程,而这些推机制会适用到很多的业务场景,也有很多对应的推机制策略

场景分析一

比如我在APP上下了一个订单,我们的系统和服务很多,我们如何得知这个消息被那个系统或者那些服务或者系统进行消费,此时就需要一个消费策略,或称为消费的方法论。

message-dispatch-strategy

场景分析二

在发送消息的过程中可能会出现异常,或者网络的抖动,故障等等因为造成消息的无法消费,比如用户在下订单,消费MQ接受,订单系统出现故障,导致用户支付失败,那么这个时候就需要消息中间件就必须支持消息重试机制策略。也就是支持:出现问题和故障的情况下,消息不丢失还可以进行重发

message-dispatch-strategy-2

消息分发策略的机制和对比

ActiveMQ RabbitMQ Kafka RocketMQ
发布订阅 支持 支持 支持 支持
轮询分发 支持 支持 支持 /
公平分发 / 支持 支持 /
重发 支持 支持 / 支持
消息拉取 / 支持 支持 支持

消息队列的高可用和高可靠

所谓高可用:是指产品在规定的条件和规定的时刻或时间内处于可执行规定功能状态的能力。

当业务量增加时,请求也过大,一台消息中间件服务器的会触及硬件(CPU、内存、磁盘)的极限,一台消息服务器你已经无法满足业务的需求,所以消息中间件必须支持集群部署,来达到高可用的目的。

  1. Master-slave主从共享数据的部署方式

    该模式将多个消息服务器Broker连接共享一块消息存储空间,其中Master节点负责消息的写入。客户端会将消息写入到Master节点,一旦Master挂掉,slave节点继续服务,从而形成高可用。

    message-queue-high-availability

  2. Master-slave主从同步部署方式

    该模式写入消息同样在Master节点上,但是主结点会同步数据到slave节点形成副本,和zookeeper或者redis主从机制很类似。这样可以达到负载均衡的效果,如果消费者有多个,就可以到不同的节点进行消费,但是消息的拷贝和同步会占用很大的贷款和网络资源。在rabbitMQ中会有使用。

    message-queue-high-availability-2

  3. 多主集群同步部署模式

    和上述方式区别不大,但是该方式任意节点都可以进行写入。

    message-queue-high-availability-3

  4. 多主集群转发部署模式

    如果插入的数据是Broker1,元数据信息会存储数据的相关描述和记录存放的位置(队列),它会对描述信息,也就是元数据进行同步;

    如果消费者在Broker2中进行消费,发现自己没有对应的消息,就会在自己的元数据信息中去查询,如果查询到了直接返回。如果没有查询到就会将该消息的信息携带在请求中转发到其他节点去询问,直到找到所需的信息为止。

    场景:比如买火车票或者黄牛买演唱会门票,比如第一个黄牛没有顾客说要买的演唱会门票,但是他会去联系其他的黄牛询问,如果有就返回

    message-queue-high-availability-4

  5. Master-slave与Broker-cluster组合的方案

    实现多主多从的热备机制来完成消息的高可用以及数据的热备机制,在生产规模达到定的阶段的时候,这种使用的频率比较高。

    message-queue-high-availability-5

这些集群模式最终目的都是为保证:消息服务器不会挂掉,出现了故障依然可以抱着消息服务继续使用。反正终归三句话:

  • 要么消息共享
  • 要么消息同步
  • 要么元数据共享

RabbitMQ 快速入门

RabbitMQ的概念

RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑
RabbitMQ 是一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于:它不处理快件而是接收,存储和转发消息数据。

AMQP协议

RabbitMQ是一种遵循AMQP协议的分布式消息中间件。AMQP 全称 “Advanced Message Queuing Protocol”
,高级消息队列协议。它是应用层协议的一个开发标准,为面向消息的中间件设计。

下图是采用 AMQP 协议的生产者和消费者建立和释放连接的流程图:

amqp-protocol

RabbitMQ架构组成

rabbitmq-architecture

  • Broker:就是 RabbitMQ 服务,用于接收和分发消息,接受客户端的连接,实现 AMQP 实体服务。
  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace
    概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange
    或 queue 等。
  • Connection:连接,生产者/消费者与 Broker 之间的 TCP 网络连接。
  • Channel:网络信道,如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立连接的开销将是巨大的,效率也较低。Channel
    是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了
    channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的Connection
    极大减少了操作系统建立 TCP connection 的开销。
  • Message:消息,服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
  • Virtual Host:虚拟节点,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queue,同一个虚拟主机里面不能有相同名字的Exchange
  • Exchange:交换机,是 message 到达 broker 的第一站,用于根据分发规则、匹配查询表中的 routing key,分发消息到 queue
    中去,不具备消息存储的功能。常用的类型有:direct、topic、fanout。
  • Bindings:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于
    message 的分发依据。
  • Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息
  • Queue:消息队列,保存消息并将它们转发给消费者进行消费。

四大核心概念

  • 生产者:产生数据发送消息的程序是生产者。
  • 交换机:交换机是 RabbitMQ
    非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个是由交换机类型决定的。
  • 队列:队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ
    和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。
  • 消费者:消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

rabbitmq-core-concepts

RabbitMQ角色分类

rabbitmq-role-classification

  • None:不能访问 management plugin
  • Management:查看自己相关节点信息
    • 列出自己可以通过AMQP登入的虚拟机
    • 查看自己的虚拟机节点virtual hosts的queues,exchanges和bindings信息
    • 查看和关闭自己的channels和connections
    • 查看有关自己的虚拟机节点virtual hosts的统计信息。包括其他用户在这个节点virtual hosts中的活动信息
  • Policymaker
    • 包含management所有权跟
    • 查看和创建和删除自己的virtual hosts所属的policies和parameters信息
  • Monitoring
    • 包含management所有权限
    • 罗列出所有的virtual hosts,包括不能登录的virtual hosts
    • 查看其他用户的connections和channels信息
    • 查看节点级别的数据如clustering和memory使用情况
    • 查看所有的virtual hosts的全局统计信息。
  • Administrator
    • 最高权限
    • 可以创建和删除 virtual hosts
    • 可以查看,创建和删除users
    • 查看创建permissions
    • 关闭所有用户的connections

RabbitMQ消息模式

官网:RabbitMQ Tutorials — RabbitMQ

rabbitmq-message-mode

RabbitMQ提供6种模式,分别是 Hello World、Work Queues、Publish/Subscribe、Routing、Topics、RPC。本文详细讲述了前5种,并给出代码实现和思路。其中
Publish/Subscribe、Routing、Topics 三种模式可以统一归为 Exchange 模式,只是创建时交换机的类型不一样,分别是
fanout、direct、topic 三种交换机类型。

注意:简单模式和工作模式虽然途中没有画出交换机,但是都会有一个默认的交换机,类型为direct

rabbitmq-message-mode-2

  1. 简单模式(Hello World)

    一个生产者,一个消费者,一个队列,采用默认交换机。可以理解为生产者P发送消息到队列Q,一个消费者C接收。

  2. 工作模式(Work Queues)

    一个生产者,多个消费者,一个队列,采用默认交换机。可以理解为生产者P发送消息到队列Q,可以由多个消费者C1、C2进行接收。

  3. 发布/订阅模式(Publish/Subscribe)

    功能:一个生产者、一个 fanout 类型的交换机、多个队列、多个消费者。一个生产者发送的消息会被多个消费者获取。其中 fanout
    类型就是发布订阅模式,只有订阅该生产者的消费者会收到消息。

  4. 路由模式(Routing)

    功能:一个生产者,一个 direct 类型的交换机,多个队列,交换机与队列之间通过 routing-key
    进行关联绑定,多个消费者。生产者发送消息到交换机并且要指定routing-key,然后消息根据这交换机与队列之间的 routing-key
    绑定规则进行路由被指定消费者消费。

  5. 主题模式(Topics)

    说明:一个生产者,一个 topic 类型的交换机,多个队列,交换机与队列之间通过 routing-key 进行关联绑定,多个消费者。生产者发送消息到交换机并且要指定
    routing-key,然后消息根据这交换机与队列之间的 routing-key 绑定规则进行路由被指定消费者消费。与路由模式不同是
    routing-key 有指定的队则,可以更加的通用,满足更过的场景。routing-key 的规则如下:

    • #:匹配一个或者多个词,例如lazy.# 可以匹配 lazy.xxx 或者 lazy.xxx.xxx
    • :只能匹配一个词,例如lazy. 只能匹配 lazy.xxx
  6. 参数模式(RPC)

    功能:一个生产者,一个消费者,一个队列,采用默认交换机。可以理解为生产者P发送消息到队列Q,一个消费者C接收。不同的是,消费者C接收到消息后,会给生产者P一个反馈。

    作用:可以携带参数,根据参数进行过滤

RabbitMQ 使用

RabbitMQ 安装

这里主要介绍使用方法,因此安装并不作为重点。这里使用docker安装。

1
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.9.11-management

然后等待docker容器启动完成,访问 http://localhost:15672/ 即可看到管理界面,输入账号密码(admin/admin)即可登录。

注意:这里没有做持久化,因此重启docker容器后,数据会丢失

Hello World

术语

RabbitMQ中的一些术语:

  • 生产者:发送消息的程序
    producer
  • 队列:尽管消息流经RabbitMQ和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。
    queue
  • 消费者:等待接收消息的程序
    consumer

注意:生产者、消费者和队列很多时候并不在同一机器上。同一个应用程序既可以是生产者又可以是可以是消费者。

整体流程

hello world的整体流程如下,生产者将消息发送到”hello”队列中,消费者从”hello”队列中接收消息。

hello-world

下面我们使用连接rabbitmq的python客户端pika来进行操作,首先安装pika

1
pip install pika --upgrade

生产者代码

我们编写一个生产者,发送消息到队列中

sending

1
2
3
4
5
6
7
8
#!/usr/bin/env python
import pika

# 创建凭证(因为我们在docker中创建了用户admin/admin,因此这里使用这个用户)
credentials = pika.PlainCredentials('admin', 'admin')
# 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=credentials))
channel = connection.channel()

这里我们使用了pika的一个阻塞连接,然后使用连接创建一个channel,channel是消息的主要发送和接收对象。

发送的队列也必须保证存在,因此我们需要先创建一个队列。如果队列不存在,则消息将会被丢弃。

1
2
# 创建队列
channel.queue_declare(queue='hello')

在rabbitmq中,消息并不能直接发送给队列,都需要经过交换机,然后交换机再发送给队列。这里我们使用默认的交换机,交换机名称为空字符串。这个交换机是一个特殊的交换机,它会把消息发送到指定的队列中,指定的队列名称就是routing_key。

1
2
3
# 发送消息
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!'.encode())
print(" [x] Sent 'Hello World!'")

在我们确保消息发送成功后,我们需要关闭连接

1
2
# 关闭连接
connection.close()

完整的生产者代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python
import pika

# 创建凭证
credentials = pika.PlainCredentials('admin', 'admin')
# 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=credentials))
channel = connection.channel()

# 创建队列
channel.queue_declare(queue='hello')

# 发送消息
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!'.encode())
print(" [x] Sent 'Hello World!'")

# 关闭连接
connection.close()

消费者代码

我们编写一个消费者,从队列中接收消息,并打印出来

receiving

1
2
3
4
5
6
7
8
9
10
11
12

首先,需要建立连接,创建channel,这里和生产者一样

```python
#!/usr/bin/env python
import pika

# 创建凭证
credentials = pika.PlainCredentials('admin', 'admin')
# 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = connection.channel()

创建队列,保证队列存在。这里我们使用了和生产者一样的代码,因为我们不知道生产者是否已经创建了队列(真正的环境不一定生产者先启动,有可能消费者先启动),因此这里也需要创建队列。

这里创建队列使用的queue_declare方法,这个方法是幂等的,可以多次运行。也就是说,如果队列已经存在,那么就不会重复创建,如果不存在,那么就会创建。

1
2
# 创建队列
channel.queue_declare(queue='hello')

无论什么时候接收消息后,pika都会调用回调函数callback进行处理。这里我们定义了一个回调函数,用于将消息打印到控制台。

1
2
3
# 定义回调函数
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)

然后我们需要告诉rabbitmq,我们使用callback来接收消息

1
2
# 告诉rabbitmq使用callback来接收消息
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

上面的命令需要确保队列存在,之前已经使用了queue_declare方法,因此这里不需要再次调用。如果队列不存在,那么这里会报错。

auto_ack参数表示是否自动确认消息,如果为True,那么消息一旦被消费者接收,就会被认为是已经被消费,不会再次发送给消费者。如果为False,那么消息不会被认为是已经被消费,那么消息就会一直存在,直到被消费者接收。

最后,我们需要让消费者一直运行,等待消息的到来。

1
2
3
# 消费者一直运行,等待消息的到来
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

主体执行代码如下,获取KeyBoardInterrupt异常,然后退出程序

1
2
3
4
5
6
7
8
9
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

完整的消费者代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#!/usr/bin/env python
import pika, sys, os


# 定义回调函数
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)


def main():
# 创建凭证
credentials = pika.PlainCredentials('admin', 'admin')
# 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = connection.channel()

# 创建队列
channel.queue_declare(queue='hello')

# 告诉rabbitmq使用callback来接收消息
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

# 消费者一直运行,等待消息的到来
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()


if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

运行结果

首先,我们启动消费者,然后启动生产者,可以看到消费者接收到了消息

1
2
3
4
5
python consumer.py

# 输出
[*] Waiting for messages. To exit press CTRL+C
[x] Received b'Hello World!'

注意:第二个输出是生产者启动后的输出,这里只是为了说明生产者发送消息成功

1
2
3
4
python producer.py

# 输出
[x] Sent 'Hello World!'

Work Queues

工作队列(又名:任务队列),也就是一个生产者、多个消费者、一个队列。它的主要思想是避免排队等待,避免一个消息处理时间过久而无法处理下一个的问题。因此相比简单模式可以有多个消费者,原理就是把任务封装为消息并将其发送到队列中,这多个消费者可以一起处理队列中的任务。当有多个消费者时,消息会被平均分配给消费者,这样就可以实现负载均衡。

work-queues

这个概念在 Web 应用程序中特别有用,因为在 Web 应用程序中不可能在较短的 HTTP
请求窗口内处理复杂的任务。此时,可以将任务发送到后端的工作队列中,然后立即响应用户,从而使用户不必等待任务完成。当任务最终完成时,用户可以在
Web 页面中获得通知或结果。

本节中通过使用 time.sleep() 函数假装我们很忙来伪造复杂耗时的操作。我们将字符串中点的个数作为其复杂度;每个点将占一秒钟的”
工作时间”。例如,Hello…描述的一个假任务 将需要三秒钟。

生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import pika

# 创建凭证
credentials = pika.PlainCredentials('admin', 'admin')
# 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = connection.channel()

# 创建队列
channel.queue_declare(queue='task_queue')

# 发送消息
message_list = [
'First message.',
'Second message..',
'Third message...',
'Fourth message....',
'Fifth message.....',
]
for message in message_list:
channel.basic_publish(exchange='', routing_key='task_queue', body=message.encode())
print(f" [x] Sent {message}")

# 关闭连接
connection.close()

消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import os
import sys
import time
import pika


# 定义回调函数
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
print(" [x] Done")


def main():
# 创建凭证
credentials = pika.PlainCredentials('admin', 'admin')
# 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = connection.channel()

# 创建队列
channel.queue_declare(queue='task_queue')

# 告诉rabbitmq使用callback来接收消息
channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=True)

# 消费者一直运行,等待消息的到来
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()


if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

循环调度

使用任务队列的优点之一是能够轻松的实现并行工作。如果我们正在生产者消息过多,我们可以方便的添加更多消费者,这样就可以轻松的进行水平扩展。

我们需要三个终端窗口。首先起两个消费者。

1
2
python consumer.py
[*] Waiting for messages. To exit press CTRL+C
1
2
python consumer.py
[*] Waiting for messages. To exit press CTRL+C

然后启动生产者

1
2
3
4
5
6
python producer.py
[x] Sent First message.
[x] Sent Second message..
[x] Sent Third message...
[x] Sent Fourth message....
[x] Sent Fifth message.....

可以看到,两个消费者平均接收到了消息,而且每个消息都只被一个消费者接收到。这样就实现了负载均衡。

第一个窗口显示如下:

1
2
3
4
5
6
7
8
python consumer.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received First message.
[x] Done
[x] Received Third message...
[x] Done
[x] Received Fifth message.....
[x] Done

第二个窗口显示如下:

1
2
3
4
5
6
python consumer.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received Second message..
[x] Done
[x] Received Fourth message....
[x] Done

因此,可以得出扩展消费者十分简单,只需要启动更多的消费者就可以了。

消息确认

一个任务可能需要很长时间完成,使用我们现在的代码,消息一旦被消费者接收,就会被认为是已经被消费,立即将消息标记为删除,不会再次发送给消费者。这种情况下,如果消费者死亡,那么就会丢失正在处理的消息,哪怕这个消息已经被消费者接收,只要还没有处理完,就会丢失。

一般环境下,我们不希望丢失任何消息,如果一个消费者死亡,那么我们希望将消息发送给另一个消费者。

为了确保消息永远不会丢失,RabbitMQ支持消息确认。消费者发回 ack 确认,告诉RabbitMQ已收到并处理特定消息,并且RabbitMQ可以自由删除它。

如果消费者在没有发送 ack 的情况下死了(其通道关闭、连接关闭或 TCP 连接丢失),RabbitMQ
将会知道消息未完全处理并将消息重新排队。如果同时有其他消费者在线,那么它会快速将其重新传递给另一个消费者。这样你就可以确保即使消费者偶尔死了也不会丢失任何消息。

消费者有强制交付的超时时间(默认为 30 分钟),如果程序卡住或其他问题,可以保证消息不会无限期的占用内存。可以在全局配置或者每一个队列中单独配置超时时间。

消费者交付确认时强制执行超时(默认为 30
分钟)。这有助于检测从不确认交付的有问题(卡住)的消费者。您可以按照传送确认超时中所述增加此超时 。

默认情况下,手动消息确认处于打开状态。在前面的示例中,我们通过auto_ack=True
标志显式关闭了它们。现在我们将其删除,并在循环中添加一个显式的消息确认调用(只需要修改消费者的callback函数):

1
2
3
4
5
6
7
8
9
# 定义回调函数
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)

# 告诉rabbitmq使用callback来接收消息
channel.basic_consume(queue='task_queue', on_message_callback=callback)

使用此代码,您可以确保即使在处理消息时使用 CTRL+C 终止工作程序,也不会丢失任何内容。工作线程终止后不久,所有未确认的消息都会被重新传递。

注意,ack确认必须在接收交付的同一通道上发送。不同的通道,甚至是不同的连接,不能用来确认交付,否则会导致通道级协议异常。

忘记确认
错过 basic_ack 是一个常见的错误。这是一个很容易犯的错误,但后果却很严重。当您的客户端退出时,消息将被重新传送(这可能看起来像随机重新传送),但
RabbitMQ 会占用越来越多的内存,因为它无法释放任何未确认的消息。
为了调试这种错误,您可以使用rabbitmqctl打印messages_unacknowledged字段:
rabbitmqctl list_queues name messages_ready messages_unacknowledged

消息持久性

我们已经学会了如何确保即使消费者死亡,消息也不会丢失。但是如果 RabbitMQ 服务器停止,我们的消息仍然会丢失。

当 RabbitMQ 退出或者崩溃时,队列和消息就会丢失,除非显示的指定为持久的。为了保证持久性,我们需要做两件事情:我们需要将队列和消息标记为持久的。

首先,我们需要确保队列能够在 RabbitMQ 节点重新启动后继续存在。为此,我们需要将其声明为持久的:

1
channel.queue_declare(queue='task_queue', durable=True)

尽管这个命令本身是正确的,但它在我们的设置中不起作用。这是因为我们已经定义了一个名为task_queue的队列 ,它是不持久的。RabbitMQ
不允许使用不同的参数重新定义现有队列,并将向任何尝试执行此操作的程序返回错误。但有一个快速的解决方法 -
让我们声明一个具有不同名称的队列,例如new_task_queue

1
channel.queue_declare(queue='new_task_queue', durable=True)

queue_declare更改需要应用于生产者和消费者代码。

这样设置之后,即使 RabbitMQ 重新启动,队列也不会丢失。但是,消息仍然会丢失。尽管它们现在在队列中,但是 RabbitMQ
退出时,它们仍然会丢失。我们需要将我们的消息也标记为持久 - 通过提供带有pika.spec.PERSISTENT_DELIVERY_MODE
值的delivery_mode属性:

1
channel.basic_publish(exchange='', routing_key='new_task_queue', body=message.encode(), properties=pika.BasicProperties(delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE))

注意:将消息标记为持久并不能完全保证消息不会丢失。尽管它告诉 RabbitMQ 将消息保存到磁盘,但 RabbitMQ
接受消息并将其保存到磁盘之间仍然有一个很小的时间窗口。此外,RabbitMQ 不会对每条消息执行fsync(2) ——

它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证并不强,但对于我们简单的任务队列来说已经足够了。如果您需要更强的保证,那么您可以使用 publisher confirms

公平调度

很明显,调度只是平均分配任务给消费者。这种情况下,如果一个消费者比另一个消费者处理消息慢,或者所有奇数消息都很重但是偶数消息都很轻,那么就会导致一个消费者一直很忙,而另一个消费者几乎不做任何工作。RabbitMQ 对此一无所知,并且仍然会均匀地分发消息。

发生这种情况是因为 RabbitMQ 只是在消息进入队列时才调度该消息。它不会查看消费者未确认消息的数量。它只是盲目地将每条第 n 条消息分派给第 n 个消费者。

fair-dispatch

为了解决这个问题,我们可以使用设置为prefetch_count=1Channel.basic_qos方法。这使用basic.qos协议方法告诉 RabbitMQ 不要一次向消费者提供多于一条消息。或者,换句话说,在消费者处理并确认前一条消息之前,不要向消费者发送新消息。相反,它会将其分派给下一个不忙的消费者。

1
channel.basic_qos(prefetch_count=1)

关于队列大小的注意事项 :

如果所有消费者都很忙,您的队列可能会被填满。您需要密切关注这一点,也许添加更多消费者,或者使用消息 TTL

来避免这种情况。

完整代码

生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import pika
import sys

# 创建凭证
credentials = pika.PlainCredentials('admin', 'admin')

# 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = connection.channel()

# 创建队列
channel.queue_declare(queue='new_task_queue', durable=True)

# 发送消息
message_list = [
'First message.',
'Second message..',
'Third message...',
'Fourth message....',
'Fifth message.....',
]

for message in message_list:
channel.basic_publish(exchange='', routing_key='new_task_queue', body=message.encode(), properties=pika.BasicProperties(delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE))
print(f" [x] Sent {message}")

# 关闭连接
connection.close()

消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import os
import sys
import time

import pika


# 定义回调函数
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)


def main():
# 创建凭证
credentials = pika.PlainCredentials('admin', 'admin')

# 建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = connection.channel()

# 创建队列
channel.queue_declare(queue='new_task_queue', durable=True)


# 设置每个消费者每次只能接收一条消息
channel.basic_qos(prefetch_count=1)

# 告诉rabbitmq使用callback来接收消息
channel.basic_consume(queue='new_task_queue', on_message_callback=callback)

# 消费者一直运行,等待消息的到来
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()


if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

使用消息确认和prefetch_count您可以设置工作队列。即使 RabbitMQ 重新启动,持久性选项也能让任务继续存在。

Publish/Subscribe

Publish/Subscribe、Routing、Topics,这三种模式可以统一归为 Exchange 模式,它们只是创建时交换机的类型不一样,分别是 fanout、direct、topic类型

这节我们就来深入了解一下交换机 Exchange 以及它的几种工作模式。

Exchange

在前面的教程中,我们已经了解了如何向队列发送消息和从队列接收消息。现在是时候介绍 Rabbit 中完整的消息传递模型了。

让我们快速回顾一下之前教程中介绍的内容:

  1. 一个生产者应用程序发送消息。
  2. 一个队列存储消息到缓冲区直到它们可以被处理。
  3. 一个消费者应用程序接收消息并处理它们。

RabbitMQ 消息传递模型的核心思想是生产者从不直接向队列发送任何消息。实际上,生产者通常根本不知道消息是否会被传递到任何队列。

相反,生产者只能将消息发送到交换器。交换是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面,它将消息推送到队列中。交换机必须确切地知道如何处理它收到的消息。是否应该将其附加到特定队列?是否应该将其附加到许多队列中?或者应该将其丢弃。其规则由交换类型定义。

exchange

RabbitMQ有几种可用的交换类型:direct、topic、headers 和 fanout。这里我们将重点关注最后一个类型fanout,其他的会在之后说明。让我们创建一个该类型的交换,并将其称为logs

1
channel.exchange_declare(exchange='logs', exchange_type='fanout')

fanout类型的交换机非常简单,它只是将接收到的所有消息广播到它知道的所有队列。这正是我们的日志记录器所需要的。

列出所有的交换机
如果您想要列出服务器上的交换,可以运行以下命令:rabbitmqctl list_exchanges

列表中会有一写amq.*的交换和默认(未命名)交换。这些是默认创建的,但目前您不太可能需要使用它们。

默认交换机
在之前的部分,我们使用默认的交换机发送消息(程序中是一个空字符串””)。空字符串表示默认或无名交换机。消息将路由到具有routing_key指定名称的队列(如果存在)。

现在我们可以将消息发送到我们的日志交换机上了。

1
channel.basic_publish(exchange='logs', routing_key='', body=message.encode())

临时队列

还记得之前使用过的特定名称的队列吗(比如hello和task_queue)?当我们想要在生产者和消费者之间共享队列时,为队列命名非常重要。

但是我们的logs显然并非如此。我们希望监听所有的日志消息,而不仅仅是其中的一部分。同时只对当前流动的消息感兴趣,而不是旧的消息。为了解决这个问题,我们需要两件事。

首先,每当我们连接到 RabbitMQ 时,我们都需要一个新的空队列。为此,我们可以创建一个具有随机名称的队列,或者让服务器为我们选择一个随机队列名称。我们可以通过向queue_declare提供空队列参数来做到这一点:

1
result = channel.queue_declare(queue='')

这里我们使用了随机队列名称,通过将队列名称设置为空字符串,我们告诉服务器为我们选择一个随机队列名称。如果我们想要知道队列的名称,可以通过result.method.queue获得。比如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。

其次,一旦消费者连接关闭,队列就应该被删除。有一个专用标志:

1
result = channel.queue_declare(queue='', exclusive=True)

绑定

bindings

现在我们已经准备好将我们的日志消息发送到我们的交换机,交换机需要知道怎么发送消息给队列。交换机和队列之间的关系称为绑定。绑定是交换机和队列之间的关系。

1
channel.queue_bind(exchange='logs', queue=result.method.queue)

现在开始,logs交换机将会把消息发送到我们的队列中。

列出绑定
如果您想要列出服务器上的绑定,可以运行以下命令:rabbitmqctl list_bindings

完整代码

publish-subscribe

发送消息日志的生产者和之前没有什么区别,主要就是创建了一个交换机,然后发送消息到交换机上。发送时需要提供routing_key,但是fanout类型的交换机会忽略这个值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/usr/bin/env python
'''emit_log.py'''
import sys

import pika


credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

message_list = [
'First Log Message',
'Second Log Message',
'Third Log Message',
'Fourth Log Message',
'Fifth Log Message'
]
for message in message_list:
channel.basic_publish(exchange='logs', routing_key='', body=message.encode())
print(f" [x] Sent {message}")

connection.close()

如您所见,建立连接后我们声明了交换。此步骤是必要的,因为发布到不存在的交换机是不允许的。

此时如果还没有队列绑定到交换器,消息将会丢失,但这对我们来说影响不大,如果还没有消费者在监听,我们可以安全地丢弃该消息。

接收消息的消费者代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/usr/bin/env python
'''receive_logs.py'''
import pika

def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")

credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

以上就完成了所有的代码。如果希望将日志保存到文件,只需要打开控制台并输入:

1
python receive_logs.py>logs_from_rabbit.log

我们完成了。如果要将日志保存到文件中,只需打开控制台并输入:

1
python receive_logs.py>logs_from_rabbit.log

如果只是需要在控制台中查看日志:

1
python receive_logs.py

当然,发送消息的生产者也需要启动:

1
python emit_log.py

使用rabbitmqctl list_bindings,您可以验证代码是否确实按照我们的需要创建了绑定和队列。运行两个receive_logs.py程序后,您应该看到类似以下内容:

1
2
3
4
5
sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => ...done.

结果表明,我们的两个队列都绑定到logs交换机上了。当我们停止两个消费者中的一个时,我们会看到绑定消失:

1
2
3
4
sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => ...done.

Routing

绑定键

在前面的代码中,我们已经创建了绑定。您可能还记得这样的代码:

1
channel.queue_bind(exchange='logs', queue=queue_name)

绑定是交换器和队列之间的关系。这可以简单地理解为:队列对来自此交换的消息感兴趣。

绑定可以采用额外的routing_key参数。为了避免与basic_publish参数混淆,我们将其称为绑定键。下面创建一个带有绑定键的绑定:

1
channel.queue_bind(exchange='logs', queue=queue_name, routing_key='black')

绑定键的含义取决于交换类型。我们之前使用的fanout交换机会完全忽略绑定键。

Direct交换机

之前的日志系统会广播所有的消息给所有的消费者,但是我们可能只想让严重的错误消息发送给日志文件写入磁盘,而不是所有的日志消息浪费磁盘空间。

fanout交换机并不满足我们的需求,它只能进行无意识的广播。此时我们可以使用direct类型的交换机来实现这个功能。direct交换机背后的路由算法很简单:消息会被推送到绑定键与消息的路由键完全匹配的队列(注意这两个键的区别)。

为了说明这一点,做了以下设置:

direct-exchange

  1. 消息发送给direct交换机
  2. 绑定键”error”绑定到队列Q1
  3. 绑定键”info”和”warning”绑定到队列Q2
  4. 其他消息被丢弃

多重绑定

multiple-bindings

使用相同的绑定键绑定多个队列是完全合法的。在我们的设定中,我们也可以使用绑定键error
在交换机和Q2之间添加绑定。这种情况下,direct交换机将会把消息发送给两个队列。行为类似于fanout交换机。

生产者发出日志

我们将在日志系统中使用下面这个模型。我们将把消息发送到direct交换器,而不是fanout交换机。我们将提供日志严重性作为路由键。这样接收脚本将能够选择它想要接收的严重性。让我们首先关注发出日志的脚本。

像往常一样,我们需要首先创建一个交换:

1
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

然后我们可以将消息发送到交换机:

1
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message.encode())

消费者接收日志

消费者和上一节类似,但是有个例外,我们将为我们感兴趣的每个严重性创建一个新的绑定。

1
2
3
4
5
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

for severity in severities:
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)

完整代码

routing

发送日志的生产者代码如下(emit_log_direct.py):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#!/usr/bin/env python
'''emit_log_direct.py'''
import sys

import pika


credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

message_list = [
('info', 'this is a first info log'),
('warning', 'this is a first warning log'),
('error', 'this is a first error log'),
('info', 'this is a second info log'),
('warning', 'this is a second warning log'),
('error', 'this is a second error log'),
('info', 'this is a third info log'),
('warning', 'this is a third warning log'),
('error', 'this is a third error log'),
]

for severity, message in message_list:
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message.encode())
print(f" [x] Sent {severity}:{message}")

connection.close()

接收日志的消费者代码如下(receive_logs_direct.py):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python
'''receive_logs_direct.py'''
import sys

import pika

def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body.decode()}")

credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)

for severity in severities:
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

运行如下:

  1. 如果只想保存’info’和’warning’
    日志信息,只需要打开控制台并输入:python receive_logs_direct.py info warning>logs_from_rabbit.log
  2. 如果想在屏幕上查看所有日志消息,打开一个新终端并执行:python receive_logs_direct.py info warning error
  3. 如果想发出错误日志消息,只需键入python emit_log_direct.py即可

Topics

上面的代码已经实现了根据日志的严重性来过滤日志消息,但是如果我们想根据日志的发生模块来过滤日志消息呢?比如我们想把所有来自”
cron”的消息都保存到磁盘,而不是所有的错误日志。这时候就需要使用到Topic交换机了。

Topic交换机

发送到Topic交换机的消息不能有任意的routing_key -
它必须是一个由点分隔的单词列表。这些单词可以是任何内容,但通常它们指定与消息相关的一些功能。有效的路由键示例:”
stock.usd.nyse”,”nyse.vmw”,”quick.orange.rabbit”。路由键中可以有任意多个单词,最多255个字节。

绑定键也必须采用相同的形式。topic交换机背后的逻辑与direct交换机类似,使用特定路由键发送的消息将被传递到与匹配的绑定键绑定的所有队列。

但是,topic交换机绑定键有两种重要的特殊情况:

  • * 可以恰好替代一个单词。
  • # 可以替代零个或多个单词。

通过一个例子来解释这一点是最简单的:

topic

在此示例中,我们将发送所有描述动物的消息。消息将使用由三个单词(两个点)组成的路由键发送。路由键中的第一个单词将描述速度,第二个单词描述颜色,第三个单词描述物种:”..“。

我们创建了三个绑定:Q1与绑定键”.orange.“绑定,Q2与”..rabbit”和”lazy.#”绑定。

这些绑定可以概括为:

  • Q1 对所有橙色动物都感兴趣。
  • Q2 想听听关于兔子的一切,以及关于懒惰动物的一切。

与路由键的匹配规则如下:

  1. 路由键设置为”quick.orange.rabbit”,那么消息就将被传递到两个队列。
  2. “lazy.orange.elephant”也将发送给两个队列。
  3. “quick.orange.fox”只会进入第一个队列。
  4. “lazy.brown.fox”只会进入第二个队列。
  5. “lazy.pink.rabbit”只会被传递到第二个队列一次,即使它匹配两个绑定。
  6. “quick.brown.fox”不匹配任何绑定,因此它将被丢弃。
  7. 如果我们违反规则发送了一个单词或者四个单词的消息(例如”orange”或者”quick.orange.new.rabbit”)
    ,因为全部匹配失败,这些消息将不会被传递到任何队列,它们将会被丢弃。
  8. 而”lazy.orange.new.rabbit”,即使它有四个单词,也会匹配最后一个绑定,并将被传递到第二个队列。

topic exchange是非常强大的,可以实现其他类型的交换机。

当队列使用”#”绑定键绑定时,它将接收所有消息,无论路由键如何,就像在fanout交换机中一样。
当绑定中没有使用特殊字符”*”和”#”时,topic交换机的行为就像direct交换机一样。

代码实现

我们在日志中使用topic交换机。首先假定日志的路由键有两个单词:”.“。

生产者发送消息的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#!/usr/bin/env python
'''emit_log_topic.py'''
import sys

import pika


credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

message_list = [
('kern.critical', 'A critical kernel error'),
('kern.error', 'A kernel error'),
('kern.warning', 'A kernel warning'),
('auth.critical', 'A critical auth error'),
('auth.error', 'A auth error'),
('auth.warning', 'A auth warning'),
('cron.critical', 'A critical cron error'),
('cron.error', 'A cron error'),
('cron.warning', 'A cron warning'),
]

for routing_key, message in message_list:
channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message.encode())
print(f" [x] Sent {routing_key}:{message}")

connection.close()

消费者接收消息的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python
'''receive_logs_topic.py'''
import sys

import pika

def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body.decode()}")

credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)

for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

运行如下:

  1. 如果想获取所有的日志消息,只需要打开控制台并输入:python receive_logs_topic.py "#"
  2. 如果只想获取来自”cron”的日志消息,只需要打开控制台并输入:python receive_logs_topic.py "cron.*"
  3. 如果只想获取”critical”严重性的日志消息,只需要打开控制台并输入:python receive_logs_topic.py "*.critical"
  4. 还可以创建多个绑定,如果想获取来自”kern”或者”warning”
    严重性的日志消息,只需要打开控制台并输入:python receive_logs_topic.py "kern.*" "*.warning"
  5. 如果想发出日志消息,只需键入python emit_log_topic.py即可

RPC

在介绍工作队列的时候,我们初步尝试了怎么在多个消费者中分配耗时的任务。我们使用了一个简单的队列来分发任务。

但是如果我们需要在远程计算机上运行一个函数并等待结果怎么办?这种模式通常称为远程过程调用或RPC。

在本节中,我们将使用 RabbitMQ 构建一个 RPC 系统:一个客户端和一个可扩展的 RPC 服务器。由于我们没有任何值得分发的耗时任务,因此我们将创建一个返回斐波那契数的虚拟
RPC 服务。

客户端接口

为了说明如何使用 RPC 服务,我们将创建一个简单的客户端类。它将公开一个名为call的方法 ,该方法发送 RPC 请求并阻塞,直到收到答案:

1
2
3
fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print(f"fib(4) is {result}")

关于RPC

尽管 RPC 是计算中非常常见的模式,但它经常受到批评。当程序员不知道函数调用是本地函数还是慢速 RPC 时,就会引发问题。
类似的混乱会导致系统不可预测,并给调试增加不必要的复杂性。滥用 RPC 不但不会简化软件,反而会导致难以维护的代码。

考虑到这一点,请考虑以下建议:

  • 确保清楚地知道哪个函数调用是本地的,哪个是远程的。
  • 对系统制作文档或记录。明确组件之间的依赖关系。
  • 处理错误情况。当RPC服务器长时间宕机时,客户端应该如何反应?

当有疑问的时候请避免使用RPC。如果可以的话,您应该使用异步管道将结果异步推送到下一个计算阶段,而不是类似RPC的阻塞进行计算。

回调队列

一般来说,通过 RabbitMQ 进行 RPC 调用很容易。客户端发送请求消息,服务器回复响应消息。为了接收响应,客户端需要随请求发送”回调”
队列的地址。

1
2
3
4
5
6
result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties(reply_to=callback_queue), body=request)

# ... and some code to read a response message from the callback_queue ...

消息属性
AMQP 0-9-1 协议预定义了消息附带的 14 个属性。大多数属性很少使用,但以下属性除外:

  • delivery_mode:将消息标记为持久(值为2)或瞬态(任何其他值)。
  • content_type:用于描述编码的 mime-type。例如,对于经常使用的 JSON 编码,最好将此属性设置为:application/json。
  • reply_to:通常用于命名回调队列。
  • correlation_id:用于将 RPC 响应与请求关联起来。

Correlation id

使用上面介绍的方法,我们建议为每个 RPC 请求创建一个回调队列。这是相当低效的,但幸运的是有一个更好的方法,就是为每个客户端创建一个回调队列。

这就产生了一个新的问题,在收到响应之后,我们不知道该响应属于哪个请求。这时候correlation_id属性就派上用场了。我们将为每个请求设置唯一值。然后,当我们在回调队列中收到消息时,我们将查看此属性,并基于此我们将能够将响应与请求进行匹配。如果我们看到未知的correlation_id值,就可以断定,这个响应不属于我们发出的请求,就可以安全的丢弃消息了。

你可能会问,为什么我们要忽略回调队列中的未知消息,而不是因错误而抛出异常?这是由于服务器端可能存在竞争条件。虽然不太可能,但
RPC 服务器有可能在向我们发送答案之后并且在发送请求的确认消息之前就挂掉了。如果发生这种情况,重新启动的 RPC
服务器将再次处理该请求。这就是为什么在客户端我们必须优雅地处理重复的响应,并且 RPC 理想情况下应该是幂等的。

总结

rabbitmq-rpc

我们的 RPC 将像这样工作:

  1. 当Client启动时,它会创建一个匿名的独占的回调队列。
  2. 对于 RPC 请求,客户端发送一条具有两个属性的消息
    • reply_to:它设置为回调队列。
    • correlation_id,它设置为每个请求的唯一值。
  3. 请求被发送到rpc_queue队列。
  4. RPC 工作线程(又名:服务器)正在等待该队列上的请求。当请求出现时,它会执行作业并使用来自reply_to字段的队列将带有结果的消息发送回客户端。
  5. 客户端等待回调队列上的数据。当出现消息时,它会检查correlation_id属性。如果它与请求中的值匹配,它将向应用程序返回响应。

代码实现

rpc 服务器(类似于消费者)代码相当简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#!/usr/bin/env python
'''rpc_server.py'''
import pika

credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
n = int(body)

print(f" [.] fib({n})")
response = fib(n)

ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id), body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

服务器代码相当简单:

  1. 像往常一样,我们首先建立连接并声明队列rpc_queue。
  2. 我们声明我们的斐波那契函数。它假设仅有效的正整数输入。(不要指望这个适用于大数字,它可能是最慢的递归实现)。
  3. 我们为RPC服务器的核心basic_consume声明一个回调on_request 。当收到请求时执行。它完成工作并发送回响应。
  4. 我们可能想要运行多个服务器进程。为了将负载均匀地分布在多个服务器上,我们需要设置prefetch_count设置。

RPC 客户端(类似于生产者)代码稍微复杂一些:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#!/usr/bin/env python
'''rpc_client.py'''
import uuid

import pika

class FibonacciRpcClient:
def __init__(self):
credentials = pika.PlainCredentials('admin', 'admin')
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))

self.channel = self.connection.channel()

result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue

self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True)

self.response = None
self.corr_id = None

def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body

def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(reply_to=self.callback_queue, correlation_id=self.corr_id),
body=str(n))
self.connection.process_data_events(time_limit=None)
return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

客户端代码稍微复杂一些:

  1. 我们建立一个连接、通道并声明一个专用的回调队列用于回复。
  2. 我们订阅callback_queue ,以便我们可以接收RPC响应。
  3. 在每个响应上执行的on_response回调正在执行一项非常简单的工作,对于每个响应消息,它都会检查correlation_id是否是我们正在寻找的那个。如果是这样,它将响应保存在self.response中并打破消费循环。
  4. 接下来,我们定义主要调用方法,它执行的实际的 RPC 请求。
  5. 在call方法中,我们生成一个唯一的correlation_id数字并保存它,on_response回调函数将使用该值来捕获适当的响应。
  6. 同样在call方法中,我们发布请求消息,具有两个属性:reply_to和correlation_id。
  7. 最后,我们等待正确的响应到达并将响应返回给用户。

我们的 RPC 服务现已准备就绪。我们可以启动服务器,运行如下:

1
python rpc_server.py

然后我们可以启动客户端,运行如下:

1
python rpc_client.py

所提出的设计并不是 RPC 服务唯一可能的实现,但它具有一些重要的优点:

  1. 如果 RPC 服务器太慢,您可以通过运行另一台服务器来进行扩展。比如尝试在新控制台中运行第二个rpc_server.py 。
  2. 在客户端,RPC 仅需要发送和接收一条消息。不需要像queue_declare这样的同步调用 。因此,RPC 客户端对于单个 RPC 请求仅需要一次网络往返。
文章目录
  1. 1. MQ相关的概念
    1. 1.1. MQ的基本概念
    2. 1.2. 常用的MQ
    3. 1.3. 消息队列协议
    4. 1.4. 常用消息中间件协议
    5. 1.5. 消息队列持久化
    6. 1.6. 消息的分发策略
      1. 1.6.1. 场景分析一
      2. 1.6.2. 场景分析二
      3. 1.6.3. 消息分发策略的机制和对比
    7. 1.7. 消息队列的高可用和高可靠
  2. 2. RabbitMQ 快速入门
    1. 2.1. RabbitMQ的概念
    2. 2.2. AMQP协议
    3. 2.3. RabbitMQ架构组成
    4. 2.4. 四大核心概念
    5. 2.5. RabbitMQ角色分类
    6. 2.6. RabbitMQ消息模式
  3. 3. RabbitMQ 使用
    1. 3.1. RabbitMQ 安装
    2. 3.2. Hello World
      1. 3.2.1. 术语
      2. 3.2.2. 整体流程
      3. 3.2.3. 生产者代码
      4. 3.2.4. 消费者代码
      5. 3.2.5. 运行结果
    3. 3.3. Work Queues
      1. 3.3.1. 生产者代码
      2. 3.3.2. 消费者代码
      3. 3.3.3. 循环调度
      4. 3.3.4. 消息确认
      5. 3.3.5. 消息持久性
      6. 3.3.6. 公平调度
      7. 3.3.7. 完整代码
    4. 3.4. Publish/Subscribe
      1. 3.4.1. Exchange
      2. 3.4.2. 临时队列
      3. 3.4.3. 绑定
      4. 3.4.4. 完整代码
    5. 3.5. Routing
      1. 3.5.1. 绑定键
      2. 3.5.2. Direct交换机
      3. 3.5.3. 多重绑定
      4. 3.5.4. 生产者发出日志
      5. 3.5.5. 消费者接收日志
      6. 3.5.6. 完整代码
    6. 3.6. Topics
      1. 3.6.1. Topic交换机
      2. 3.6.2. 代码实现
    7. 3.7. RPC
      1. 3.7.1. 客户端接口
      2. 3.7.2. 回调队列
      3. 3.7.3. Correlation id
      4. 3.7.4. 总结
      5. 3.7.5. 代码实现