消息队列高手课-基础篇-01

Posted by 瞿广 on Tuesday, October 22, 2019

TOC

03-消息模型:主题和队列有什么区别?

主题和队列有什么区别?

在互联网的架构师圈儿中间,流传着这样一句不知道出处的名言,我非常认同和喜欢: 好的架构不是设计出 来的,而是演进出来的。 现代的消息队列呈现出的模式,一样是经过之前的十几年逐步演进而来的。

最初的消息队列,就是一个严格意义上的队列。在计算机领域,“队列(Queue)”是一种数据结构,有完 整而严格的定义。在维基百科中,队列的定义是这样的:

队列是先进先出(FIFO, First-In-First-Out)的线性表(Linear List)。在具体应用中通常用链 表或者数组来实现。队列只允许在后端(称为rear)进行插入操作,在前端(称为front)进 行删除操作。

这个定义里面包含几个关键点,第一个是先进先出,这里面隐含着的一个要求是,在消息入队出队过程中, 需要保证这些消息严格有序,按照什么顺序写进队列,必须按照同样的顺序从队列中读出来。不过,队列是 没有“读”这个操作的,“读”就是出队,也就是从队列中“删除”这条消息。

早期的消息队列,就是按照“队列”的数据结构来设计的。我们一起看下这个图,生产者(Producer)发 消息就是入队操作,消费者(Consumer)收消息就是出队也就是删除操作,服务端存放消息的容器自然就 称为“队列”。 这就是最初的一种消息模型: 队列模型。

/img/geektime-mq-queue.png

如果有多个生产者往同一个队列里面发送消息,这个队列中可以消费到的消息,就是这些生产者生产的所有消息的合集。消息的顺序就是这些生产者发送消息的自然顺序。如果有多个消费者接收同一个队列的消息,这些消费者之间实际上是竞争的关系,每个消费者只能收到队列中的一部分消息,也就是说任何一条消息只能被其中的一个消费者收到。

如果需要将一份消息数据分发给多个消费者,要求每个消费者都能收到全量的消息,例如,对于一份订单数据,风控系统、分析系统、支付系统等都需要接收消息。这个时候,单个队列就满足不了需求,一个可行的解决方式是,为每个消费者创建一个单独的队列,让生产者发送多份。

显然这是个比较蠢的做法,同样的一份消息数据被复制到多个队列中会浪费资源,更重要的是,生产者必须知道有多少个消费者。为每个消费者单独发送一份消息,这实际上违背了消息队列“解耦”这个设计初衷。

为了解决这个问题,演化出了另外一种消息模型 :“发布-订阅模型(Publish-Subscribe Pattern)”。

/img/geektime-mq-publish-subscribe.png

在发布-订阅模型中,消息的发送方称为发布者(Publisher),消息的接收方称为订阅者(Subscriber), 服务端存放消息的容器称为主题(Topic)。发布者将消息发送到主题中,订阅者在接收消息之前需要 先“订阅主题”。“订阅”在这里既是一个动作,同时还可以认为是主题在消费时的一个逻辑副本,每份订阅中,订阅者都可以接收到主题的所有消息。

在消息领域的历史上很长的一段时间,队列模式和发布-订阅模式是并存的,有些消息队列同时支持这两种 消息模型,比如ActiveMQ。我们仔细对比一下这两种模型,生产者就是发布者,消费者就是订阅者,队列就是主题,并没有本质的区别。 它们最大的区别其实就是,一份消息数据能不能被消费多次的问题。

实际上,在这种发布-订阅模型中,如果只有一个订阅者,那它和队列模型就基本是一样的了。也就是说, 发布-订阅模型在功能层面上是可以兼容队列模型的。

现代的消息队列产品使用的消息模型大多是这种发布-订阅模型,当然也有例外。

RabbitMQ的消息模型

这个例外就是RabbitMQ,它是少数依然坚持使用队列模型的产品之一。那它是怎么解决多个消费者的问题 呢?你还记得我在上节课中讲到RabbitMQ的一个特色Exchange模块吗?在RabbitMQ中,Exchange位于生 产者和队列之间,生产者并不关心将消息发送给哪个队列,而是将消息发送给Exchange,由Exchange上配 置的策略来决定将消息投递到哪些队列中。

/img/geektime-mq-rabbitmq.png

同一份消息如果需要被多个消费者来消费,需要配置Exchange将消息发送到多个队列,每个队列中都存放 一份完整的消息数据,可以为一个消费者提供消费服务。这也可以变相地实现新发布-订阅模型中,“一份 消息数据可以被多个订阅者来多次消费”这样的功能。具体的配置你可以参考RabbitMQ官方教程,其中一 个章节专门是讲如何实现发布订阅的。

RocketMQ的消息模型

讲完了RabbitMQ的消息模型,我们再来看看RocketMQ。RocketMQ使用的消息模型是标准的发布-订阅模型,在RocketMQ的术语表中,生产者、消费者和主题与我在上面讲的发布-订阅模型中的概念是完全一样的。

但是,在RocketMQ也有队列(Queue)这个概念,并且队列在RocketMQ中是一个非常重要的概念,那队列在RocketMQ中的作用是什么呢?这就要从消息队列的消费机制说起。

几乎所有的消息队列产品都使用一种非常朴素的“请求-确认”机制,确保消息不会在传递过程中由于网络或服务器故障丢失。具体的做法也非常简单。在生产端,生产者先将消息发送给服务端,也就是Broker,服务端在收到消息并将消息写入主题或者队列中后会给生产者发送确认的响应。

如果生产者没有收到服务端的确认或者收到失败的响应,则会重新发送消息;在消费端,消费者在收到消息并完成自己的消费业务逻辑(比如,将数据保存到数据库中)后,也会给服务端发送消费成功的确认,服务端只有收到消费确认后,才认为一条消息被成功消费,否则它会给消费者重新发送这条消息,直到收到对应的消费成功确认。

这个确认机制很好地保证了消息传递过程中的可靠性,但是,引入这个机制在消费端带来了一个不小的问题。什么问题呢?为了确保消息的有序性,在某一条消息被成功消费之前,下一条消息是不能被消费的,否则就会出现消息空洞,违背了有序性这个原则。 也就是说,每个主题在任意时刻,至多只能有一个消费者实例在进行消费,那就没法通过水平扩展消费者的 数量来提升消费端总体的消费性能。 为了解决这个问题,RocketMQ在主题下面增加了队列的概念。

  • 每个主题包含多个队列,通过多个队列来实现多实例并行生产和消费。需要注意的是,RocketMQ只在队列上保证消息的有序性,主题层面是无法保证消息的严格顺序的。

  • RocketMQ中,订阅者的概念是通过消费组(Consumer Group)来体现的。每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,一条消息被Consumer Group1消费过, 也会再给Consumer Group2消费。

  • 消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。如果一条消息被消费者Consumer1消费了,那同组的其他消费者就不会再收到这条消息。

  • 在Topic的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要RocketMQ为每个消费组在每个队列上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。

这个消费位置是非常重要的概念,我们在使用消息队列的时候,丢消息的原因大多是由于消费位置处理不当导致的。

RocketMQ的消息模型中,比较关键的概念就是这些了。为了便于你理解,我画了下面这张图:

/img/geektime-mq-rocketmq.png

你可以对照这张图再把我刚刚讲的这些概念继续消化一下,加深理解。

Kafka的消息模型

我们再来看看另一种常见的消息队列Kafka,Kafka的消息模型和RocketMQ是完全一样的,我刚刚讲的所有 RocketMQ中对应的概念,和生产消费过程中的确认机制,都完全适用于Kafka。唯一的区别是,在Kafka 中,队列这个概念的名称不一样,Kafka中对应的名称是“分区(Partition)”,含义和功能是没有任何区别的。

小结

我们来总结一下本节课学习的内容。首先我们讲了队列和主题的区别,这两个概念的背后实际上对应着两种 不同的消息模型:队列模型和发布-订阅模型。然后你需要理解,这两种消息模型其实并没有本质上的区 别,都可以通过一些扩展或者变化来互相替代。

常用的消息队列中,RabbitMQ采用的是队列模型,但是它一样可以实现发布-订阅的功能。RocketMQ和 Kafka采用的是发布-订阅模型,并且二者的消息模型是基本一致的。

最后提醒你一点,我这节课讲的消息模型和相关的概念是业务层面的模型,深刻理解业务模型有助于你用最佳的姿势去使用消息队列。

但业务模型不等于就是实现层面的模型。比如说MySQL和Hbase同样是支持SQL的数据库,它们的业务模型 中,存放数据的单元都是“表”,但是在实现层面,没有哪个数据库是以二维表的方式去存储数据的, MySQL使用B+树来存储数据,而HBase使用的是KV的结构来存储。同样,像Kafka和RocketMQ的业务模型 基本是一样的,并不是说他们的实现就是一样的,实际上这两个消息队列的实现是完全不同的


04-如何利用事务消息实现分布式事务?

其实很多场景下,我们“发消息”这个过程,目的往往是通知另外一个系统或者模块去更新数据,消息队列 中的“事务”,主要解决的是消息生产者和消息消费者的数据一致性问题。

什么是分布式事务?

在实际应用中,比较常见的分布式事务实现有2PC(Two-phase Commit,也叫二阶段提交)、TCC(Try- Confirm-Cancel)和事务消息。每一种实现都有其特定的使用场景,也有各自的问题,都不是完美的解决方案。

事务消息适用的场景主要是那些需要异步更新数据,并且对数据实时性要求不太高的场景。比如我们在开始时提到的那个例子,在创建订单后,如果出现短暂的几秒,购物车里的商品没有被及时清空,也不是完全不可接受的,只要最终购物车的数据和订单数据保持一致就可以了。

2PC和TCC不是我们本次课程讨论的内容,就不展开讲了,感兴趣的同学可以自行学习。

消息队列是如何实现分布式事务的?

事务消息需要消息队列提供相应的功能才能实现,Kafka和RocketMQ都提供了事务相关功能。

回到订单和购物车这个例子,我们一起来看下如何用消息队列来实现分布式事务。

/img/geektime-mq-.png

首先,订单系统在消息队列上开启一个事务。然后订单系统给消息服务器发送一个“半消息”,这个半消息不是说消息内容不完整,它包含的内容就是完整的消息内容,半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。

半消息发送成功后,订单系统就可以执行本地事务了,在订单库中创建一条订单记录,并提交订单库的数据库事务。然后根据本地事务的执行结果决定提交或者回滚事务消息。如果订单创建成功,那就提交事务消息,购物车系统就可以消费到这条消息继续后续的流程。如果订单创建失败,那就回滚事务消息,购物车系统就不会收到这条消息。这样就基本实现了“要么都成功,要么都失败”的一致性要求。

如果你足够细心,可能已经发现了,这个实现过程中,有一个问题是没有解决的。如果在第四步提交事务消息时失败了怎么办?对于这个问题,Kafka和RocketMQ给出了2种不同的解决方案。 Kafka的解决方案比较简单粗暴,直接抛出异常,让用户自行处理。我们可以在业务代码中反复重试提交, 直到提交成功,或者删除之前创建的订单进行补偿。RocketMQ则给出了另外一种解决方案。

RocketMQ中的分布式事务实现

在RocketMQ中的事务实现中,增加了 事务反查的机制来解决事务消息提交失败的问题。如果Producer也就是订单系统,在提交或者回滚事务消息时发生网络异常,RocketMQ的Broker没有收到提交或者回滚的请求,Broker会定期去Producer上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。

为了支撑这个事物反查机制,我们的业务代码需要实现一个反查本地事务状态的接口,告知RocketMQ本地事务是成功还是失败。

在我们这个例子中,反查本地事务的逻辑也很简单,我们只要根据消息中的订单ID,在订单库中查询这个订单是否存在即可,如果订单存在则返回成功,否则返回失败。RocketMQ会自动根据事务反查的结果提交或 者回滚事务消息。

这个反查本地事务的实现,并不依赖消息的发送方,也就是订单服务的某个实例节点上的任何数据。这种情况下,即使是发送事务消息的那个订单服务节点宕机了,RocketMQ依然可以通过其他订单服务的节点来执行反查,确保事务的完整性。

综合上面讲的通用事务消息的实现和RocketMQ的事务反查机制,使用RocketMQ事务消息功能实现分布式 事务的流程如下图:

/img/geektime-mq--rocketmq.png

小结

我们通过一个订单购物车的例子,学习了事务的ACID四个特性,以及如何使用消息队列来实现分布式事务。然后我们给出了现有的几种分布式事务的解决方案,包括事务消息,但是这几种方案都不能解决分布式系统中的所有问题,每一种方案都有局限性和特定的适用场景。

最后,我们一起学习了RocketMQ的事务反查机制,这种机制通过定期反查事务状态,来补偿提交事务消息可能出现的通信失败。在Kafka的事务功能中,并没有类似的反查机制,需要用户自行去解决这个问题。

但是,这不代表RocketMQ的事务功能比Kafka更好,只能说在我们这个例子的场景下,更适合使用 RocketMQ。Kafka对于事务的定义、实现和适用场景,和RocketMQ有比较大的差异,后面的课程中,我们会专门讲到Kafka的事务的实现原理。


05-如何确保消息不会丢失

其实,现在主流的消息队列产品都提供了非常完善的消息可靠性保证机制,完全可以做到在消息传递过程中,即使发生网络中断或者硬件故障,也能确保消息的可靠传递,不丢消息。

绝大部分丢消息的原因都是由于开发者不熟悉消息队列,没有正确使用和配置消息队列导致的。虽然不同的 消息队列提供的API不一样,相关的配置项也不同,但是在保证消息可靠传递这块儿,它们的实现原理是一样的。

这节课我们就来讲一下,消息队列是怎么保证消息可靠传递的,这里面的实现原理是怎么样的。当你熟知原 理以后,无论你使用任何一种消息队列,再简单看一下它的API和相关配置项,就能很快知道该如何配置消 息队列,写出可靠的代码,避免消息丢失。

检测消息丢失的方法

我们说,用消息队列最尴尬的情况不是丢消息,而是消息丢了还不知道。一般而言,一个新的系统刚刚上线,各方面都不太稳定,需要一个磨合期,这个时候,特别需要监控到你的系统中是否有消息丢失的情况。

如果是IT基础设施比较完善的公司,一般都有分布式链路追踪系统,使用类似的追踪系统可以很方便地追踪每一条消息。如果没有这样的追踪系统,这里我提供一个比较简单的方法,来检查是否有消息丢失的情况。 我们可以利用消息队列的有序性来验证是否有消息丢失。原理非常简单,在Producer端,我们给每个发出的消息附加一个连续递增的序号,然后在Consumer端来检查这个序号的连续性。

我们可以利用消息队列的有序性来验证是否有消息丢失。 原理非常简单,在Producer端,我们给每个发出 的消息附加一个连续递增的序号,然后在Consumer端来检查这个序号的连续性。 如果没有消息丢失,Consumer收到消息的序号必然是连续递增的,或者说收到的消息,其中的序号必然是 上一条消息的序号+1。如果检测到序号不连续,那就是丢消息了。还可以通过缺失的序号来确定丢失的是 哪条消息,方便进一步排查原因。

大多数消息队列的客户端都支持拦截器机制,你可以利用这个拦截器机制,在Producer发送消息之前的拦截器中将序号注入到消息中,在Consumer收到消息的拦截器中检测序号的连续性,这样实现的好处是消息检测的代码不会侵入到你的业务代码中,待你的系统稳定后,也方便将这部分检测的逻辑关闭或者删除。

如果是在一个分布式系统中实现这个检测方法,有几个问题需要你注意。

首先,像Kafka和RocketMQ这样的消息队列,它是不保证在Topic上的严格顺序的,只能保证分区上的消息 是有序的,所以我们在发消息的时候必须要指定分区,并且,在每个分区单独检测消息序号的连续性。

如果你的系统中Producer是多实例的,由于并不好协调多个Producer之间的发送顺序,所以也需要每个Producer分别生成各自的消息序号,并且需要附加上Producer的标识,在Consumer端按照每个Producer分 别来检测序号的连续性。

Consumer实例的数量最好和分区数量一致,做到Consumer和分区一一对应,这样会比较方便地在Consumer内检测消息序号的连续性。

确保消息可靠传递

讲完了检测消息丢失的方法,接下来我们一起来看一下,整个消息从生产到消费的过程中,哪些地方可能会导致丢消息,以及应该如何避免消息丢失。

你可以看下这个图,一条消息从生产到消费完成这个过程,可以划分三个阶段,为了方便描述,我给每个阶段分别起了个名字。

img/geektime-mq-3-parts.png

  • 生产阶段: 在这个阶段,从消息在Producer创建出来,经过网络传输发送到Broker端。
  • 存储阶段: 在这个阶段,消息在Broker端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。
  • 消费阶段: 在这个阶段,Consumer从Broker上拉取消息,经过网络传输发送到Consumer上。

1. 生产阶段

在生产阶段,消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当你的代码调用发消息方法时,消息队列的客户端会把消息发送到Broker,Broker收到消息后,会给客户端返回一个确认响应,表明消 息已经收到了。客户端收到响应后,完成了一次正常消息的发送。

只要Producer收到了Broker的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没 收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。

你在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢 失。以Kafka为例,我们看一下如何可靠地发送消息: 同步发送时,只要注意捕获异常即可。

  try {
    RecordMetadata metadata = producer.send(record).get(); 
    System.out.println("消息发送成功。");
  } catch (Throwable e) { 
    System.out.println("消息发送失败!");
    System.out.println(e);
  }

异步发送时,则需要在回调方法里进行检查。这个地方是需要特别注意的,很多丢消息的原因就是,我们使 用了异步发送,却没有在回调中检查发送结果。

producer.send(record, (metadata, exception) -> {
      if (metadata != null) {
          System.out.println("消息发送成功。"); 
    } else {
          System.out.println("消息发送失败!");
          System.out.println(exception);
      }
});

2. 存储阶段

在存储阶段正常情况下,只要Broker在正常运行,就不会出现丢失消息的问题,但是如果Broker出现了故 障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。

如果对消息的可靠性要求非常高,可以通过配置Broker参数来避免因为宕机丢消息。

对于单个节点的Broker,需要配置Broker参数,在收到消息后,将消息写入磁盘后再给Producer返回确认 响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在 RocketMQ中,需要将刷盘方式flushDiskType配置为SYNC_FLUSH同步刷盘。

如果是Broker是由多个节点组成的集群,需要将Broker集群配置成:至少将消息发送到2个以上的节点,再 给客户端回复发送确认响应。这样当某个Broker宕机时,其他的Broker可以替代宕机的Broker,也不会发 生消息丢失。后面我会专门安排一节课,来讲解在集群模式下,消息队列是如何通过消息复制来确保消息的可靠性的。

3. 消费阶段

消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从Broker拉取消息后,执行用户的 消费业务逻辑,成功后,才会给Broker发送消费确认响应。如果Broker没有收到消费确认响应,下次拉消息 的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出 错导致丢失。 你在编写消费代码时需要注意的是,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业 务逻辑之后,再发送消费确认。

同样,我们以用Python语言消费RabbitMQ消息为例,来看一下如何实现一段可靠的消费代码:

 def callback(ch, method, properties, body):
  print(" [x] 收到消息 %r" % body) 
   # 在这儿处理收到的消息 
  database.save(body)
  print(" [x] 消费完成")
   # 完成消费业务逻辑后发送消费确认响应 
  ch.basic_ack(delivery_tag = method.delivery_tag)
  channel.basic_consume(queue='hello', on_message_callback=callback)

你可以看到,在消费的回调方法callback中,正确的顺序是,先是把消息保存到数据库中,然后再发送消费确认响应。这样如果保存消息到数据库失败了,就不会执行消费确认的代码,下次拉到的还是这条消息,直到消费成功。

小结

这节课我带大家分析了一条消息从发送到消费整个流程中,消息队列是如何确保消息的可靠性,不会丢失的。这个过程可以分为分三个阶段,每个阶段都需要正确的编写代码并且设置正确的配置项,才能配合消息 队列的可靠性机制,确保消息不会丢失。

  • 在生产阶段,你需要捕获消息发送的错误,并重发消息。
  • 在存储阶段,你可以通过配置刷盘和复制相关的参数,让消息写入到多个副本的磁盘上,来确保消息不会因为某个Broker宕机或者磁盘损坏而丢失。
  • 在消费阶段,你需要在处理完全部消费业务逻辑之后,再发送消费确认。

你在理解了这几个阶段的原理后,如果再出现丢消息的情况,应该可以通过在代码中加一些日志的方式,很快定位到是哪个阶段出了问题,然后再进一步深入分析,快速找到问题原因。