消息队列高手课-基础篇-02

Posted by 瞿广 on Tuesday, October 22, 2019

TOC

06-如何处理消费过程中的重复消息?

在消息传递过程中,如果出现传递失败的情况,发送方会执行重试,重试的过程中就有可能会产生重复的消息。对使用消息队列的业务系统来说,如果没有对重复消息进行处理,就有可能会导致系统的数据出现错 误。

比如说,一个消费订单消息,统计下单金额的微服务,如果没有正确处理重复消息,那就会出现重复统计,导致统计结果错误。 你可能会问,如果消息队列本身能保证消息不重复,那应用程序的实现不就简单了?那有没有消息队列能保证消息不重复呢?

消息重复的情况必然存在

在MQTT协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:

  • At most once: 至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。
  • At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
  • Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。

这个服务质量标准不仅适用于MQTT,对所有的消息队列都是适用的。我们现在常用的绝大部分消息队列提 供的服务质量都是At least once,包括RocketMQ、RabbitMQ和Kafka 都是这样。也就是说,消息队列很难保证消息不重复。

说到这儿我知道肯定有的同学会反驳我:“你说的不对,我看过Kafka的文档,Kafka是支持Exactly once 的。”我在这里跟这些同学解释一下,你说的没错,Kafka的确是支持Exactly once,但是我讲的也没有问 题,为什么呢?

Kafka支持的“Exactly once”和我们刚刚提到的消息传递的服务质量标准“Exactly once”是不一样的,它是Kafka提供的另外一个特性,Kafka中支持的事务也和我们通常意义理解的事务有一定的差异。在Kafka 中,事务和Excactly once主要是为了配合流计算使用的特性,我们在专栏“进阶篇”这个模块中,会有专门 的一节课来讲Kafka的事务和它支持的Exactly once特性。

稍微说一些题外话,Kafka的团队是一个非常善于包装和营销的团队,你看他们很巧妙地用了两个所有人都非常熟悉的概念“事务”和“Exactly once”来包装它的新的特性,实际上它实现的这个事务和Exactly once 并不是我们通常理解的那两个特性,但是你深入了解Kafka的事务和Exactly once后,会发现其实它这个特性 虽然和我们通常的理解不一样,但确实和事务、Exactly once有一定关系。

这一点上,我们都要学习Kafka团队。一个优秀的开发团队,不仅要能写代码,更要能写文档,能写Slide(PPT),还要能讲,会分享。对于每个程序员来说,也是一样的。

我们把话题收回来,继续来说重复消息的问题。既然消息队列无法保证消息不重复,就需要我们的消费代码能够接受“消息是可能会重复的”这一现状,然后,通过一些方法来消除重复消息对业务的影响。

用幂等性解决重复消息问题

一般解决重复消息的办法是,在消费端,让我们消费消息的操作具备幂等性。 幂等(Idempotence) 本来是一个数学上的概念,它是这样定义的:

如果一个函数f(x)满足:f(f(x)) = f(x),则函数f(x)满足幂等性。 这个概念被拓展到计算机领域,被用来描述一个操作、方法或者服务。一个幂等操作的特点是,其任意多次 执行所产生的影响均与一次执行的影响相同。

一个幂等的方法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。所以,对于幂等的方法,不用担心重复执行会对系统造成任何改变。 我们举个例子来说明一下。在不考虑并发的情况下,“将账户X的余额设置为100元”,执行一次后对系统 的影响是,账户X的余额变成了100元。只要提供的参数100元不变,那即使再执行多少次,账户X的余额始 终都是100元,不会变化,这个操作就是一个幂等的操作。

再举一个例子,“将账户X的余额加100元”,这个操作它就不是幂等的,每执行一次,账户余额就会增加 100元,执行多次和执行一次对系统的影响(也就是账户的余额)是不一样的。 如果我们系统消费消息的业务逻辑具备幂等性,那就不用担心消息重复的问题了,因为同一条消息,消费一次和消费多次对系统的影响是完全一样的。也就可以认为,消费多次等于消费一次。 从对系统的影响结果来说:At least once + 幂等消费 = Exactly once。 那么如何实现幂等操作呢?最好的方式就是,从业务逻辑设计上入手,将消费的业务逻辑设计成具备幂等性 的操作。但是,不是所有的业务都能设计成天然幂等的,这里就需要一些方法和技巧来实现幂等。

下面我给你介绍几种常用的设计幂等操作的方法:

1. 利用数据库的唯一约束实现幂等

例如我们刚刚提到的那个不具备幂等特性的转账的例子:将账户X的余额加100元。在这个例子中,我们可 以通过改造业务逻辑,让它具备幂等性。 首先,我们可以限定,对于每个转账单每个账户只可以执行一次变更操作,在分布式系统中,这个限制实现 的方法非常多,最简单的是我们在数据库中建一张转账流水表,这个表有三个字段:转账单ID、账户ID和变 更金额,然后给转账单ID和账户ID这两个字段联合起来创建一个唯一约束,这样对于相同的转账单ID和账 户ID,表里至多只能存在一条记录。

这样,我们消费消息的逻辑可以变为:“在转账流水表中增加一条转账记录,然后再根据转账记录,异步操 作更新用户余额即可。”在转账流水表增加一条转账记录这个操作中,由于我们在这个表中预先定义了“账 户ID转账单ID”的唯一约束,对于同一个转账单同一个账户只能插入一条记录,后续重复的插入操作都会失 败,这样就实现了一个幂等的操作。我们只要写一个SQL,正确地实现它就可以了。 基于这个思路,不光是可以使用关系型数据库,只要是支持类似“INSERT IF NOT EXIST”语义的存储类系 统都可以用于实现幂等,比如,你可以用Redis的SETNX命令来替代数据库中的唯一约束,来实现幂等消 费。

2. 为更新的数据设置前置条件

另外一种实现幂等的思路是,给数据变更设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。

比如,刚刚我们说过,“将账户X的余额增加100元”这个操作并不满足幂等性,我们可以把这个操作加上 一个前置条件,变为:“如果账户X当前的余额为500元,将余额加100元”,这个操作就具备了幂等性。对 应到消息队列中的使用时,可以在发消息时在消息体中带上当前的余额,在消费的时候进行判断数据库中, 当前余额是否与消息中的余额相等,只有相等才执行变更操作。

但是,如果我们要更新的数据不是数值,或者我们要做一个比较复杂的更新操作怎么办?用什么作为前置判 断条件呢?更加通用的方法是,给你的数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是 否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号+1,一样可以实现幂等 更新。

3. 记录并检查操作

如果上面提到的两种实现幂等方法都不能适用于你的场景,我们还有一种通用性最强,适用范围最广的实现 幂等性方法:记录并检查操作,也称为“Token机制或者GUID(全局唯一ID)机制”,实现的思路特别简 单: 在执行数据更新操作之前,先检查一下是否执行过这个更新操作。

具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的ID,消费时,先根据这个ID检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。

原理和实现是不是很简单?其实一点儿都不简单,在分布式系统中,这个方法其实是非常难实现的。首先, 给每个消息指定一个全局唯一的ID就是一件不那么简单的事儿,方法有很多,但都不太好同时满足简单、高 可用和高性能,或多或少都要有些牺牲。更加麻烦的是,在“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性,才能真正实现幂等,否则就会出现Bug。

比如说,对于同一条消息:“全局ID为8,操作为:给ID为666账户增加100元”,有可能出现这样的情况:

  • t0时刻:Consumer A 收到条消息,检查消息执行状态,发现消息未处理过,开始执行“账户增加100 元”;
  • t1时刻:Consumer B 收到条消息,检查消息执行状态,发现消息未处理过,因为这个时刻,Consumer A还未来得及更新消息执行状态。

这样就会导致账户被错误地增加了两次100元,这是一个在分布式系统中非常容易犯的错误,一定要引以为 戒。 对于这个问题,当然我们可以用事务来实现,也可以用锁来实现,但是在分布式系统中,无论是分布式事务 还是分布式锁都是比较难解决问题。

小结

这节课我们主要介绍了通过幂等消费来解决消息重复的问题,然后我重点讲了几种实现幂等操作的方法,你可以利用数据库的约束来防止重复更新数据,也可以为数据更新设置一次性的前置条件,来防止重复消息,如果这两种方法都不适用于你的场景,还可以用“记录并检查操作”的方式来保证幂等,这种方法适用范围最广,但是实现难度和复杂度也比较高,一般不推荐使用。

这些实现幂等的方法,不仅可以用于解决重复消息的问题,也同样适用于,在其他场景中来解决重复请求或 者重复调用的问题。比如,我们可以将HTTP服务设计成幂等的,解决前端或者APP重复提交表单数据的问 题;也可以将一个微服务设计成幂等的,解决RPC框架自动重试导致的重复调用问题。这些方法都是通用 的,希望你能做到触类旁通,举一反三。


07-消息积压了该如何处理?

据我了解,在使用消息队列遇到的问题中,消息积压这个问题,应该是最常遇到的问题了,并且,这个问题 还不太好解决。

我们都知道,消息积压的直接原因,一定是系统中的某个部分出现了性能问题,来不及处理上游发送的消 息,才会导致消息积压。

所以,我们先来分析下,在使用消息队列时,如何来优化代码的性能,避免出现消息积压。然后再来看看, 如果你的线上系统出现了消息积压,该如何进行紧急处理,最大程度地避免消息积压对业务的影响。

优化性能来避免消息积压

在使用消息队列的系统中,对于性能的优化,主要体现在生产者和消费者这一收一发两部分的业务逻辑中。 对于消息队列本身的性能,你作为使用者,不需要太关注。为什么这么说呢?

主要原因是,对于绝大多数使用消息队列的业务来说,消息队列本身的处理能力要远大于业务系统的处理能 力。主流消息队列的单个节点,消息收发的性能可以达到每秒钟处理几万至几十万条消息的水平,还可以通 过水平扩展Broker的实例数成倍地提升处理能力。

而一般的业务系统需要处理的业务逻辑远比消息队列要复杂,单个节点每秒钟可以处理几百到几千次请求, 已经可以算是性能非常好的了。所以,对于消息队列的性能优化,我们更关注的是,在消息的收发两端,我们的业务代码怎么和消息队列配合,达到一个最佳的性能。

1. 发送端性能优化

发送端业务代码的处理性能,实际上和消息队列的关系不大,因为一般发送端都是先执行自己的业务逻辑, 最后再发送消息。如果说,你的代码发送消息的性能上不去,你需要优先检查一下,是不是发消息之前的业务逻辑耗时太多导致的。

对于发送消息的业务逻辑,只需要注意设置合适的并发和批量大小,就可以达到很好的发送性能。为什么这么说呢?

我们之前的课程中讲过Producer发送消息的过程,Producer发消息给Broker,Broker收到消息后返回确认响应,这是一次完整的交互。假设这一次交互的平均时延是1ms,我们把这1ms的时间分解开,它包括了下面这些步骤的耗时:

  • 发送端准备数据、序列化消息、构造请求等逻辑的时间,也就是发送端在发送网络请求之前的耗时;
  • 发送消息和返回响应在网络传输中的耗时;
  • Broker处理消息的时延。

如果是单线程发送,每次只发送1条消息,那么每秒只能发送 1000ms / 1ms * 1条/ms = 1000条 消息,这种 情况下并不能发挥出消息队列的全部实力。 无论是增加每次发送消息的批量大小,还是增加并发,都能成倍地提升发送性能。至于到底是选择批量发送

还是增加并发,主要取决于发送端程序的业务性质。简单来说,只要能够满足你的性能要求,怎么实现方便 就怎么实现。

比如说,你的消息发送端是一个微服务,主要接受RPC请求处理在线业务。很自然的,微服务在处理每次请 求的时候,就在当前线程直接发送消息就可以了,因为所有RPC框架都是多线程支持多并发的,自然也就实 现了并行发送消息。并且在线业务比较在意的是请求响应时延,选择批量发送必然会影响RPC服务的时延。 这种情况,比较明智的方式就是通过并发来提升发送性能。

如果你的系统是一个离线分析系统,离线系统在性能上的需求是什么呢?它不关心时延,更注重整个系统的 吞吐量。发送端的数据都是来自于数据库,这种情况就更适合批量发送,你可以批量从数据库读取数据,然后批量来发送消息,同样用少量的并发就可以获得非常高的吞吐量。

2. 消费端性能优化

使用消息队列的时候,大部分的性能问题都出现在消费端,如果消费的速度跟不上发送端生产消息的速度, 就会造成消息积压。如果这种性能倒挂的问题只是暂时的,那问题不大,只要消费端的性能恢复之后,超过 发送端的性能,那积压的消息是可以逐渐被消化掉的。

要是消费速度一直比生产速度慢,时间⻓了,整个系统就会出现问题,要么,消息队列的存储被填满无法提 供服务,要么消息丢失,这对于整个系统来说都是严重故障。

所以,我们在设计系统的时候,一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行。

消费端的性能优化除了优化消费业务逻辑以外,也可以通过水平扩容,增加消费端的并发数来提升总体的消 费性能。特别需要注意的一点是,在扩容Consumer的实例数量的同时,必须同步扩容主题中的分区(也叫 队列)数量,确保Consumer的实例数和分区数量是相等的。如果Consumer的实例数量超过分区数量,这 样的扩容实际上是没有效果的。

消息积压了该如何处理?

还有一种消息积压的情况是,日常系统正常运转的时候,没有积压或者只有少量积压很快就消费掉了,但是 某一个时刻,突然就开始积压消息并且积压持续上涨。这种情况下需要你在短时间内找到消息积压的原因, 迅速解决问题才不至于影响业务。

导致突然积压的原因肯定是多种多样的,不同的系统、不同的情况有不同的原因,不能一概而论。但是,我们排查消息积压原因,是有一些相对固定而且比较有效的方法的。

能导致积压突然增加,最粗粒度的原因,只有两种:要么是发送变快了,要么是消费变慢了。

大部分消息队列都内置了监控的功能,只要通过监控数据,很容易确定是哪种原因。如果是单位时间发送的 消息增多,比如说是赶上大促或者抢购,短时间内不太可能优化消费端的代码来提升消费性能,唯一的方法是 通过扩容消费端的实例数来提升总体的消费能力。

如果短时间内没有足够的服务器资源进行扩容,没办法的办法是,将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务。

还有一种不太常⻅的情况,你通过监控发现,无论是发送消息的速度还是消费消息的速度和原来都没什么变 化,这时候你需要检查一下你的消费端,是不是消费失败导致的一条消息反复消费这种情况比较多,这种情 况也会拖慢整个系统的消费速度。

如果监控到消费变慢了,你需要检查你的消费实例,分析一下是什么原因导致消费变慢。优先检查一下日志 是否有大量的消费错误,如果没有错误的话,可以通过打印堆栈信息,看一下你的消费线程是不是卡在什么地方不动了,比如触发了死锁或者卡在等待某些资源上了。

小结

这节课我们主要讨论了2个问题,一个是如何在消息队列的收发两端优化系统性能,提前预防消息积压。另 外一个问题是,当系统发生消息积压了之后,该如何处理。优化消息收发性能,预防消息积压的方法有两种,增加批量或者是增加并发,在发送端这两种方法都可以使用,在消费端需要注意的是,增加并发需要同步扩容分区数量,否则是起不到效果的。

对于系统发生消息积压的情况,需要先解决积压,再分析原因,毕竟保证系统的可用性是首先要解决的问题。快速解决积压的方法就是通过水平扩容增加Consumer的实例数量。

08-答疑解惑(一)-网关如何接收服务端的秒杀结果?

  1. 网关如何接收服务端的秒杀结果?

    public class RequestHandler {
    // ID生成器
    @Inject
    private IdGenerator idGenerator; // 消息队列生产者
    @Inject
    private Producer producer;
    // 保存秒杀结果的Map
    @Inject
    private Map<Long, Result> results;
    // 保存mutex的Map
    private Map<Long, Object> mutexes = new ConcurrentHashMap<>(); // 这个网关实例的ID
    @Inject
    private long myId;
    @Inject
    private long timeout;
    
    // 在这里处理APP的秒杀请求
    public Response onRequest(Request request) {
    // 获取一个进程内唯一的UUID作为请求id
        Long uuid = idGenerator.next();
        try {
            Message msg = composeMsg(request, uuid, myId);
            // 生成一个mutex,用于等待和通知
            Object mutex = new Object();
            mutexes.put(uuid, mutex)
    // 发消息
            producer.send(msg);
    // 等待后端处理
            synchronized (mutex) {
                mutex.wait(timeout);
            }
    
            // 查询秒杀结果
            Result result = results.remove(uuid);
    // 检查秒杀结果并返回响应
            if (null != result && result.success()) {
                return Response.success();
            }
        } catch (Throwable ignored) {
        } finally {
            mutexes.remove(uuid);
        }
    // 返回秒杀失败
        return Response.fail();
    }
    
    // 在这里处理后端服务返回的秒杀结果
    public void onResult(Result result) {
        Object mutex = mutexes.get(result.uuid());
        if (null != mutex) { // 如果查询不到,说明已经超时了,丢弃result即可。
    // 登记秒杀结果
            results.put(result.uuid(), result); // 唤醒处理APP请求的线程
            synchronized (mutex) {
                mutex.notify();
            }
        }
    }
    }
    
    
    

在这个方案中,网关在收到APP的秒杀请求后,直接给消息队列发消息。至于消息的内容,并不一定是APP 请求的Request,只要包含足够的字段就行了,比如用戶ID、设备ID、请求时间等等。另外,还需要包含这 个请求的ID和网关的ID,这些后面我们会用到。

如果发送消息失败,可以直接给APP返回秒杀失败结果,成功发送消息之后,线程就阻塞等待秒杀结果。这里面不可能无限等待下去,需要设定一个等待的超时时间。

等待结束之后,去存放秒杀结果的Map中查询是否有返回的秒杀结果,如果有就构建Response,给APP返 回秒杀结果,如果没有,按秒杀失败处理。

这是处理APP请求的线程,接下来我们来看一下,网关如何来接收从后端秒杀服务返回的秒杀结果。 我们可以选择用RPC的方式来返回秒杀结果,这里网关节点是RPC服务端,后端服务为客戶端。之前网关发

出去的消息中包含了网关的ID,后端服务可以通过这个网关ID来找到对应的网关实例,秒杀结果中需要包含 请求ID,这个请求ID也是从消息中获取的。

网关收到后端服务的秒杀结果之后,用请求ID为Key,把这个结果保存到秒杀结果的Map中,然后通知对应 的处理APP请求的线程,结束等待。我刚刚说过,处理APP请求的线程,在结束等待之后,会去秒杀的结果 Map中查询这个结果,然后再给APP返回响应。

这个解决方案还不是一个性能最优的方案,处理APP请求的线程需要同步等待秒杀结果。后面课程中我们会 专⻔来讲,如何使用异步方式来提升程序的性能。

2. 详解RocketMQ和Kafka的消息模型

假设有一个主题MyTopic,我们为主题创建5个队列,分布到2个Broker中。

/img/geektime-mq-broker.png

先说消息生产这一端,假设我们有3个生产者实例:Produer0,Produer1和Producer2。

这3个生产者是如何对应到2个Broker的,又是如何对应到5个队列的呢?这个很简单,不用对应,随便发。 每个生产者可以在5个队列中轮询发送,也可以随机选一个队列发送,或者只往某个队列发送,这些都可 以。比如Producer0要发5条消息,可以都发到队列Q0里面,也可以5个队列每个队列发一条。

然后说消费端,很多同学没有搞清楚消费组、消费者和队列这几个概念的对应关系。

每个消费组就是一份订阅,它要消费主题MyTopic下,所有队列的全部消息。注意,队列里的消息并不是消费掉就没有了,这里的“消费”,只是去队列里面读了消息,并没有删除,消费完这条消息还是在队列里面。

多个消费组在消费同一个主题时,消费组之间是互不影响的。比如我们有2个消费组:G0和G1。G0消费了 哪些消息,G1是不知道的,也不用知道。G0消费过的消息,G1还可以消费。即使G0积压了很多消息,对 G1来说也没有任何影响。

然后我们再说消费组的内部,一个消费组中可以包含多个消费者的实例。比如说消费组G1,包含了2个消费 者C0和C1,那这2个消费者又是怎么和主题MyTopic的5个队列对应的呢?

由于消费确认机制的限制,这里面有一个原则是,在同一个消费组里面,每个队列只能被一个消费者实例占用。至于如何分配,这里面有很多策略,我就不展开说了。总之保证每个队列分配一个消费者就行了。比 如,我们可以让消费者C0消费Q0,Q1和Q2,C1消费Q3和Q4,如果C0宕机了,会触发重新分配,这时候C1同时消费全部5个队列。

再强调一下,队列占用只是针对消费组内部来说的,对于其他的消费组来说是没有影响的。比如队列Q2被 消费组G1的消费者C1占用了,对于消费组G2来说,是完全没有影响的,G2也可以分配它的消费者来占用和 消费队列Q2。 最后说一下消费位置,每个消费组内部维护自己的一组消费位置,每个队列对应一个消费位置。消费位置在 服务端保存,并且,消费位置和消费者是没有关系的。每个消费位置一般就是一个整数,记录这个消费组 中,这个队列消费到哪个位置了,这个位置之前的消息都成功消费了,之后的消息都没有消费或者正在消费。

3. 如何实现单个队列的并行消费?

下面说一下《03 | 消息模型:主题和队列有什么区别?》这节课的思考题:如果不要求严格顺序,如何实现 单个队列的并行消费?关于这个问题,有很多的实现方式,在JMQ(京东自研的消息队列产品)中,它实现 的思路是这样的。 比如说,队列中当前有10条消息,对应的编号是0-9,当前的消费位置是5。同时来了三个消费者来拉消息, 把编号为5、6、7的消息分别给三个消费者,每人一条。过了一段时间,三个消费成功的响应都回来了,这 时候就可以把消费位置更新为8了,这样就实现并行消费。

这是理想的情况。还有可能编号为6、7的消息响应回来了,编号5的消息响应一直回不来,怎么办?这个位置5就是一个消息空洞。为了避免位置5把这个队列卡住,可以先把消费位置5这条消息,复制到一个特殊重 试队列中,然后依然把消费位置更新为8,继续消费。再有消费者来拉消息的时候,优先把重试队列中的那 条消息给消费者就可以了。

这是并行消费的一种实现方式。需要注意的是,并行消费开销还是很大的,不应该作为一个常规的,提升消 费并发的手段,如果消费慢需要增加消费者的并发数,还是需要扩容队列数。

4. 如何保证消息的严格顺序?

很多同学在留言中问,怎么来保证消息的严格顺序?我们多次提到过,主题层面是无法保证严格顺序的,只有在队列上才能保证消息的严格顺序。

如果说,你的业务必须要求全局严格顺序,就只能把消息队列数配置成1,生产者和消费者也只能是一个实例,这样才能保证全局严格顺序。

大部分情况下,我们并不需要全局严格顺序,只要保证局部有序就可以满足要求了。比如,在传递账戶流水 记录的时候,只要保证每个账戶的流水有序就可以了,不同账戶之间的流水记录是不需要保证顺序的。 如果需要保证局部严格顺序,可以这样来实现。在发送端,我们使用账戶ID作为Key,采用一致性哈希算法 计算出队列编号,指定队列来发送消息。一致性哈希算法可以保证,相同Key的消息总是发送到同一个队列 上,这样可以保证相同Key的消息是严格有序的。如果不考虑队列扩容,也可以用队列数量取模的简单方法 来计算队列编号。

写在最后

在留言中,很多同学留言提出来,能不能讲一讲某个消息队列的某个功能具体如何配置。我的建议是,你先 不要太关注功能、API和配置这些细节,在学习如何使用消息队列的过程中,要保持一定的高度来学习。

因为使用消息队列,大部分的难点在宏观架构层面,要解决这些难点,你需要掌握消息队列宏观层面上的实现原理和最佳实践,这样,无论你使用什么消息队列,都可以做到游刃有余。在选定了合适的消息队列产品,准备写代码之前,再去文档中查看这些细节都来得及。

所以,我们专栏的“基础篇”讲消息队列的使用,更多讲的是一些通用的原理。这节课是我们消息队列高手 课“基础篇”的最后一节课,完整基础篇的学习后,意味着你已经是一个使用消息队列的小达人了。 在“进阶篇”中,我们将把学习重点从“如何使用”转为“如何实现”,在学习消息队列的实现技术时,你 反而要专注到每一个技术点上,深入下去,把每个细节都要搞清楚、学透。课程的深度、难度也会逐步加 强,当然你获得的经验值也会更多。

09-学习开源代码该如何入手?

对于很多开源软件来说,如果我们把它作为我们业务系统的重要组成部分之一,真正地用 于生产,仅仅知道如何使用是远远不够的,你必须掌握它的实现原理和很多细节,这样才能找到最佳的使用 姿势,当你的系统出现问题时,你才有可能基于它的实现原理,再根据一些现象来排查问题原因。

掌握这些开源软件的最佳方式就是去学习它的源代码。很多同学跟我说:“我也很想去看一些开源软件的代 码,也尝试去看过,但是面对上千个源码文件,几十万行代码,完全不知道从哪儿入手啊。” 这节课我们就针对这个情况来聊一聊,学习开源软件的代码该如何入手。

有一点我提前说明一下,对于这节课里面涉及到的一些名词,我会直接使用英文,主要目的是方便你直接对 应到那些开源软件英文官网上的标题。 通过文档来了解开源项目 学习源代码应该从哪儿入手呢? 最佳的方式就是先看它的文档。

通过看文档,你可以快速地掌握这个软件整体的结构,它有哪些功能特性,它涉及到的关键技术、实现原理 和它的生态系统等等。在掌握了这些之后,你对它有个整体的了解,然后再去看它的源代码,就不会再有那 种盲人摸象找不到头绪的感觉了。

首先强调一点是,你必须去看这些开源软件官方网站上的文档,尽量不要去网上搜一些翻译的中文文档。为什么呢?

因为这些开源软件,特别是一些社区活跃的软件,它的迭代是很快的,即使是自带官方中文翻译的项目,它的中文文档很多都会落后于英文版,你能看到的中文版本很多时候都已经过时了。那非官方的翻译,问题可 能就不止是过时的问题了,可能还会出现一些错漏的地方。所以,最好还是直接来看官方的英文文档。

如果说你的英文阅读水平确实有限,直接阅读英文文档有困难或者看得非常慢,怎么办?你还是要按照我接 下来告诉你的方法去看它的英文官网,即使阅读大段的技术文章有困难,网站的标题你总能看懂吧?找到你 需要阅读的文章后,你可以在网上搜一下对应的中文版本,先看一遍中文版,然后再对着英文原版过一遍, 弥补中文版可能过时或翻译不准确的问题。

开源社区经过这么多年的发展,它已经形成一个相对比较成熟的文化。每个开源软件,代码如何管理、社区 成员如何沟通、如何协作这些都已经形成了一个比较固定的套路。大多数开源软件,它的官网和技术文档也 是有一个相对比较固定的结构的。

接下来我们以Kafka的官网为例子,来说下怎么来看它的文档。

如果说你对这个项目完全不了解,没用过这个软件,你首先需要看的文档是 Quick Start,按照Quick Start 中的指导快速把它的环境搭起来,把它运行起来,这样你会对这个项目有个感性认识,也便于你在后续深入 学习的时候“跑”一些例子。

然后你需要找一下它的 Introduction,一般里面会有项目的基本介绍。这里面很重要的一点是,你需要找到 这个项目用到的一些基本概念或者名词的介绍文档,在Kafka的文档中,这些内容就在Introduction里面, 比如Topic、Producer、 Consumer、Partition这些概念在Kafka中代表的含义。

有些开源项目会单独有一个 Basic Concepts文档来讲这些基础概念。这个文档非常重要,因为这些开源社区 的开发者都有个很不好的爱好:发明概念。很多开源项目都会自己创造一些名词或者概念,了解这些基本概念才有可能看懂它项目的其他文档。

对项目有个基本的了解之后呢,接下来你可以看一下它的使用场景、功能特性以及相关的生态系统的介绍。 在Kafka中功能相关的内容在 Use casesEcoSystem两篇文章中,有些项目中会有类似名为Features的文档 介绍功能和特性。

其中项目的生态系统,也就是EcoSystem,一般会介绍它这个项目适用的一些典型的使用场景,在某个场景 下适合与哪些其他的系统一起来配合使用等。如果说你的系统不是特别特殊或者说冷⻔的话,你大概率可以 在EcoSystem里面找到和你类似的场景,可以少走很多的弯路。

你在读完上面这些文档之后,对这个项目的整体应该会有一个比较全面的了解了,比如说: - 这个项目是干什么的? - 能解决哪些问题? - 适合在哪些场景使用? - 有哪些功能? - 如何使用?

对这些问题有一个初步的答案之后,接下来你就可以去深入学习它的实现原理了。这是不是意味着,你可以 立即去看它的源码呢?这样做或许可行,但并不是最好的方法。

你知道大部分开源项目都是怎么诞生的吗?一般来说是这样的:某个大学或者大厂的科学家,某天脑海里突然出现了一个改变世界的想法,科学家们会基于这个想法做一些深入的研究,然后写了一篇论文在某个学术 期刊或者会议上发表。论文发表后在业内获得很多的赞,这时候就轮到像Google、Facebook这样的大厂出 手了:这个论文很有价值,不如我们把它实现出来吧?一个开源项目就这样诞生了。

所以,对于这样的开源项目,它背后的这篇论文就是整个项目的灵魂,你如果能把这篇论文看完并且理解透了,这个项目的实现原理也就清楚了。

对于Kafka来说,它的灵魂是这篇博文:The Log: What every software engineer should know about real- time data’s unifying abstraction,对应的中文译稿在这里:《日志:每个软件工程师都应该知道的有关 实时数据的统一抽象》。

这篇博文被评为程序员史诗般必读文章,无论你是不是想了解Kafka的实现原理,我都强烈推荐你好好读一下上面这篇博文。

学习完项目灵魂,就可以开始阅读源码了。

用以点带面的方式来阅读源码

需要注意的是,你在读源码的时候,千万不要上来就找main方法这样泛泛地去看,为什么?你可以想一 下,一篇文章,它是一个线性结构,你从前往后读就行了。一本书呢?如果我们看目录的话,可以认为是个 树状结构,但大多数的书的内容还是按照线性结构来组织的,你可以从前往后读,也可以通过目录跳着读。

那程序的源代码是什么结构?那是一个网状结构,关系错综复杂,所以这种结构是非常不适合人类去阅读 的。你如果是泛泛去读源代码,很容易迷失在这个代码织成的网里面。那怎么办?

我推荐大家阅读源码的方式是,带着问题去读源码,最好是带着问题的答案去读源码。你每次读源码之前, 确定一个具体的问题,比如:

  • RocketMQ的消息是怎么写到文件里的?
  • Kafka的Coordinator是怎么维护消费位置的?

类似这种非常细粒度的问题,粒度细到每个问题的答案就是一两个流程就可以回答,这样就可以了。如果说 你就想学习一下源代码,或者说提不出这些问题怎么办呢?答案还是,看文档

确定问题后,先不要着急看源代码,而是应该先找一下是否有对应的实现文档,一般来说,核心功能都会有 专⻔的文档来说明它的实现原理,比如在Kafka的文档中,DESIGNIMPLEMENTATION两个章节中,介绍 了Kafka很多功能的实现原理和细节。一些更细节的非核心的功能不一定有专⻔的文档来说明,但是我们可 以去找一找是否有对应的Improvement Proposal。(Kafka的所有Improvement Proposals在这里。)

这个Improvement Proposal是什么呢?你可以认为它是描述一个新功能的文档,一般开源项目需要增加一 个新的功能或者特性的时候,都会创建一个Improvement Proposal,一般标题都是”xIP-新功能名称”,其 中IP就是Improvement Proposal的缩写,x一般就是这个开源项目的名称的首字母,比如Kafka中 Improvement Proposal的标题就都是以KIP来开头。

每个Improvement Proposal都是有固定格式的,一般要说明为什么需要增加这个功能,会对系统产生那些 影响和改变,还有我们最关心的设计和实现原理的简述。

你读完讲解实现的文档再去看源代码,也就是我刚刚说的,不只是带着问题去读,而是带着答案去读源码。 这样你在读源码的时候,不仅仅是更容易理解源代码,还可以把更多的精力放在一些实现细节上,这样阅读 源码的效果会更好。 使用这种以问题为阅读单元的方式来读源代码,你每次只要花很短的时间,阅读很少的一部分源码,就能解 决一个问题,得到一些收获。这种方式其实是通过一个一个的问题,在网状的源代码中,每次去读几个点组 成的那一两条线。随着你通过阅读源码了解的问题越来越多,你对项目源码的理解也会越来越全面和深入。

小结

如果你想了解一个开源项目,学习它的代码,最佳的切入点就是去读它的官方文档,这些文档里面,最重要 的灵魂就是项目背后的那篇论文,它一般是这个开源项目的理论基础。

在阅读源码的时候呢,最佳的方式是带着问题去阅读,最好是带着问题的答案去读,这样难度低、周期短、 收获快。不要想着一定要从总体上去全面掌握一个项目的所有源代码,也没有必要。