消息队列学习笔记
目录
公号:码农充电站pro
1,消息队列的适用场景
消息队列适合处理的问题:
- 异步处理:让系统可以快速的响应用户
- 流量控制:避免过多的请求压垮系统
- 服务解耦
- 等
2,流行的消息队列产品
流行的消息队列产品:
- RabbitMQ:是由 Erlang 语言编写的,是一个相当轻量级的消息队列,非常容易部署和使用。RabbitMQ 的客户端支持的编程语言大概是所有消息队列中最多的。
- 它的缺点 1:对消息堆积的支持并不好,当大量消息积压的时候,会导致 RabbitMQ 的性能急剧下降。
- 它的缺点 2:RabbitMQ 的性能是我们介绍的这几个消息队列中最差的。
- RocketMQ:它是阿里巴巴在 2012 年开源的消息队列产品,由 Java 语言开发,后来捐赠给 Apache 软件基金会,2017 成为 Apache 的顶级项目。
- RocketMQ 的性能比 RabbitMQ 要高一个数量级,每秒钟大概能处理几十万条消息。
- Kafka:它使用 Scala 和 Java 语言开发,也是 Apache 的顶级项目,能够处理海量数据。
- 当下的 Kafka 已经发展为一个非常成熟的消息队列产品,无论在数据可靠性、稳定性和功能特性等方面都可以满足绝大多数场景的需求。
- Kafka 与周边生态系统的兼容性是最好的没有之一,尤其在大数据和流计算领域,几乎所有的相关开源软件系统都会优先支持 Kafka。
3,主题与队列的区别
队列最原始的定义是一种先进先出的数据结构。
主题用在发布订阅模型中,主题与队列最大的不同是:一份消息数据能不能被消费多次。
一个主题支持多个订阅者订阅,一份消息可以被消费多次。
现代的消息队列产品使用的消息模型大多是这种发布 - 订阅模型,当然也有例外。
1,RabbitMQ 的消息模型
RabbitMQ 是少数依然坚持使用队列模型的产品之一。
如果 RabbitMQ 想达到一份消息可被多个消费者消费的目的,需要对 Exchange 进行配置,将一份消息重复放入多个队列中,可参考这里。
2,RocketMQ 的消息模型
RocketMQ 使用的消息模型是发布 - 订阅模型。
一般一个消费者会对应一个队列。
RocketMQ 中的每个主题包含多个队列,通过多个队列来实现多实例并行生产和消费。需要注意的是,RocketMQ 只在队列上保证消息的有序性,主题层面是无法保证消息的严格顺序的。
RocketMQ 中,订阅者的概念是通过消费组(Consumer Group)来体现的。
- 每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,一条消息被 Consumer Group1 消费过,也会再给 Consumer Group2 消费。
- 消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。如果一条消息被消费者 Consumer1 消费了,那同组的其他消费者就不会再收到这条消息。
- 在 Topic 的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要 RocketMQ 为每个消费组在每个队列上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。这个消费位置是非常重要的概念,我们在使用消息队列的时候,丢消息的原因大多是由于消费位置处理不当导致的。
4,Kafka 的消息模型
Kafka 的消息模型和 RocketMQ 是完全一样的,所有 RocketMQ 中对应的概念,和生产消费过程中的确认机制,都完全适用于 Kafka。唯一的区别是,在 Kafka 中,队列这个概念的名称不一样,Kafka 中对应的名称是“分区(Partition)”,含义和功能是没有任何区别的。
4,消息队列如何实现分布式事务
比较常见的分布式事务实现有 :
- 2PC(Two-phase Commit,也叫二阶段提交)
- TCC(Try-Confirm-Cancel)
- 事务消息
事务消息适用的场景主要是那些需要异步更新数据,并且对数据实时性要求不太高的场景。
事务消息需要消息队列提供相应的功能才能实现,Kafka 和 RocketMQ 都提供了事务相关功能。
以订单和购物车这个例子(订单系统要往订单库和消息队列发布消息,要保证消息的一致性,即要么都发送成功,要么都发送失败,不能仅有一个发送成功),来看下如何用消息队列来实现分布式事务。
- 订单系统在消息队列上开启一个事务。
- 订单系统给消息服务器发送一个“半消息”
- 这个半消息不是说消息内容不完整,它包含的内容就是完整的消息内容,半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。
- 半消息发送成功后,订单系统就可以执行本地事务了,在订单库中创建一条订单记录,并提交订单库的数据库事务。
- 根据本地事务的执行结果决定提交或者回滚事务消息。
- 如果订单创建成功,那就提交事务消息,购物车系统就可以消费到这条消息继续后续的流程。
- 如果订单创建失败,那就回滚事务消息,购物车系统就不会收到这条消息。
如果在第四步提交事务消息时失败了怎么办?对于这个问题,Kafka 和 RocketMQ 给出了 2 种不同的解决方案。Kafka 的解决方案比较简单粗暴,直接抛出异常,让用户自行处理。
我们可以在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单。RocketMQ 则给出了另外一种解决方案。
RocketMQ 中的分布式事务实现
RocketMQ 中增加了事务反查机制来解决事务消息提交失败的问题。
如果 Producer 在提交或者回滚事务消息时发生网络异常,RocketMQ 的 Broker 没有收到提交或者回滚的请求,Broker 会定期去 Producer 上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。
为了支撑这个事务反查机制,我们的业务代码需要实现一个反查本地事务状态的接口,告知 RocketMQ 本地事务是成功还是失败。
在我们这个例子中,只要根据消息中的订单 ID,在订单库中查询这个订单是否存在即可,如果订单存在则返回成功,否则返回失败。RocketMQ 会自动根据事务反查的结果提交或者回滚事务消息。
5,使用消息队列的三个问题
1,如何确保消息不丢失
一条消息从生产到消费完成这个过程,可以划分三个阶段,只有保证消息在这三个阶段不丢失,那么消息就不会丢失。
在生产阶段,消息队列通过请求确认机制,来保证消息的可靠传递。只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。
以 Kafka 为例,来看下正确发送消息的代码:
// 同步发送消息
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("消息发送成功。");
} catch (Throwable e) {
System.out.println("消息发送失败!");
System.out.println(e);
}
// 异步发送消息
producer.send(record, (metadata, exception) -> {
if (metadata != null) {
System.out.println("消息发送成功。");
} else {
System.out.println("消息发送失败!");
System.out.println(exception);
}
});
在消息存储阶段:
- 对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应
- 对于多个节点组成的集群,需要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。
在消费阶段,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。
2,如何处理重复消息
在消息传递过程中,如果出现传递失败的情况,发送方会执行重试,重试的过程中就有可能会产生重复的消息。而消息队列本身很难保证消息不重复,所以一般需要在业务代码中处理重复消息。
一般解决重复消息的办法是,在消费端,让消费消息的操作具备幂等性:其任意多次执行所产生的影响均与一次执行的影响相同。
比如,“将账户 X 的余额设置为 100 元”这个操作就是幂等的,“将账户 X 的余额加 100 元”就不是幂等的。可以这样改造:“如果账户 X 当前的余额为 500 元,将余额加 100 元”。
3,如何处理消息积压问题
消息积压的直接原因,一定是系统中的某个部分出现了性能问题,来不及处理上游发送的消息,才会导致消息积压。
如何避免消息积压
处理消息积压的一种方法是避免消息积压,即对消息队列的优化。
- 发送端优化:需要设置合适的并发和批量大小,就可以达到很好的发送性能
- 消费端优化:只有保证消费端的消费性能高于生产端的发送性能,这样的系统才能健康的持续运行。
- 通过水平扩容,增加消费端的并发数来提升总体的消费性能。
- 在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。
- 如果 Consumer 的实例数量超过分区数量,这样的扩容实际上是没有效果的,因为对于消费者来说,在每个分区上实际上只能支持单线程消费。
消息积压了该如何处理?
不管如何处理,消息积压还是有可能发生的。
导致积压突然增加的原因,只有两种:要么是发送变快了,要么是消费变慢了。
快速解决积压的方法就是通过水平扩容增加 Consumer 的实例数量。
6,如何实现高性能
1,异步设计
将同步操作优化成异步操作:
- 同步实现的方式,整个服务器的所有线程大部分时间都没有在工作,而是都在等待。
- 异步的方式大量使用了回调函数。
- 当要执行一项比较耗时的操作时,不去等待操作结束,而是给这个操作一个命令:“当操作完成后,接下来去执行什么”
- 异步方式可以减少/避免线程等待,只用很少的线程就可以达到超高的吞吐能力。
- 相比于同步实现,异步实现的复杂度要大很多,代码的可读性和可维护性都会显著的下降。
例如下面的同步代码(一个转账函数),优化成异步代码:
// 同步代码
Transfer(accountFrom, accountTo, amount) {
// 先从accountFrom的账户中减去相应的钱数
Add(accountFrom, -1 * amount)
// 再把减去的钱数加到accountTo的账户中
Add(accountTo, amount)
return OK
}
// 异步代码,涉及到回调方法
TransferAsync(accountFrom, accountTo, amount, OnComplete()) {
// 异步从accountFrom的账户中减去相应的钱数,然后调用OnDebit方法。
AddAsync(accountFrom, -1 * amount, OnDebit(accountTo, amount, OnAllDone(OnComplete())))
}
// 扣减账户accountFrom完成后调用
OnDebit(accountTo, amount, OnAllDone(OnComplete())) {
// 再异步把减去的钱数加到accountTo的账户中,然后执行OnAllDone方法
AddAsync(accountTo, amount, OnAllDone(OnComplete()))
}
// 转入账户accountTo完成后调用
OnAllDone(OnComplete()) {
OnComplete()
}
Java 中比较常用的异步框架有:
- Java8 内置的 CompletableFuture 类,简单易用
- ReactiveX 的 RxJava,功能更加强大
2,异步(非阻塞)网络传输
一个 TCP 连接建立后,用户代码会获得一个用于收发数据的通道,每个通道会在内存中开辟两片区域用于收发数据的缓存。
- 对于数据发送:用户代码在发送时写入的数据会暂存在缓存中,然后操作系统会通过网卡,把发送缓存中的数据传输到对端的服务器上。
- 只要这个缓存不满,或者,发送数据的速度没有超过网卡传输速度的上限,那这个发送数据的操作耗时,只不过是一次内存写入的时间,这个时间是非常快的。所以,发送数据的时候同步发送就可以了,没有必要异步。
- 对于数据接收:因为不知道什么时候数据才会到来,如果使用同步的方法,就会造成阻塞,因此需要使用异步的方式。
在 Java 中,大名鼎鼎的 Netty 就是一个异步网络框架。
Java NIO 是一个更加底层的异步网络框架,Netty 也是基于它的。
关于JAVA的网络,有个比喻形式的总结:有一个养鸡的农场,里面养着来自各个农户(Thread)的鸡(Socket),每家农户都在农场中建立了自己的鸡舍(SocketChannel):
- 1、BIO:Block IO,每个农户盯着自己的鸡舍,一旦有鸡下蛋,就去做捡蛋处理
- 2、NIO:No-Block IO-单Selector,农户们花钱请了一个饲养员(Selector),并告诉饲养员(register)如果哪家的鸡有任何情况(下蛋)均要向这家农户报告(select keys)
- 3、NIO:No-Block IO-多Selector,当农场中的鸡舍逐渐增多时,一个饲养员巡视(轮询)一次所需时间就会不断地加长,这样农户知道自己家的鸡有下蛋的情况就会发生较大的延迟。怎么解决呢?没错,多请几个饲养员(多Selector),每个饲养员分配管理鸡舍,这样就可以减轻一个饲养员的工作量,同时农户们可以更快的知晓自己家的鸡是否下蛋了
- 4、Epoll模式:如果采用Epoll方式,农场问题应该如何改进呢?其实就是饲养员不需要再巡视鸡舍,而是听到哪间鸡舍的鸡打鸣了(活跃连接),就知道哪家农户的鸡下蛋了;
- 5、AIO:Asynchronous I/O, 鸡下蛋后,以前的NIO方式要求饲养员通知农户去取蛋,AIO模式出现以后,事情变得更加简单了,取蛋工作由饲养员自己负责,然后取完后,直接通知农户来拿即可,而不需要农户自己到鸡舍去取蛋。
3,序列化与反序列化
要想使用网络框架的 API 来传输结构化的数据,必须得先实现结构化的数据与字节流之间的双向转换。这种将结构化数据转换成字节流的过程,我们称为序列化,反过来转换,就是反序列化。
有很多通用的序列化实现:
- Java 和 Go 语言都内置了序列化实现
- 一些流行的开源序列化实现,比如,Google 的 Protobuf、Kryo、Hessian 等
- 此外,像 JSON、XML 这些标准的数据格式,也可以作为一种序列化实现来使用
4,如何更好的进行内存管理
现代的编程语言(比如 Java,Go 等),大多采用自动内存管理机制,虚拟机会不定期执行垃圾回收,自动释放不再使用的内存,但是执行垃圾回收的过程会导致进程暂停。
在高并发的场景下,会产生大量的待回收的对象,需要频繁地执行垃圾回收,导致程序长时间暂停,程序看起来就像卡死了一样。
为了缓解这个问题,我们需要尽量少地使用一次性对象,对于需要频繁使用,占用内存较大的一次性对象,可以考虑自行回收并重用这些对象,来减轻垃圾回收的压力。
5,Kafka 如何实现高性能
除了上面几节介绍到的一些通用的性能优化方法,Kafka 还使用了下面几个关键的技术点:
- 使用批量处理的方式来提升系统吞吐能力。
- 在客户端,当调用 send() 方法发送一条消息之后,无论你是同步发送还是异步发送,Kafka 都不会立即就把这条消息发送出去。它会先把这条消息,存放在内存中缓存起来,然后选择合适的时机把缓存中的所有消息组成一批,一次性发给 Broker。
- 在服务端,Kafka 不会把一批消息再还原成多条消息,再一条一条地处理(这样太慢了)。Kafka 会把每个批消息当做一个“批消息”来处理。在 Broker 整个处理流程中,无论是写入磁盘、从磁盘读出来、还是复制到其他副本这些流程中,批消息都不会被解开,一直是作为一条“批消息”来进行处理。
- 在消费时,消息同样是以批为单位进行传递的,Consumer 从 Broker 拉到一批消息后,在客户端把批消息解开,再一条一条交给用户代码处理。
- 构建批消息和解开批消息分别在发送端和消费端的客户端完成,不仅减轻了 Broker 的压力,最重要的是减少了 Broker 处理请求的次数,提升了总体的处理能力。
- 基于磁盘文件高性能顺序读写的特性来设计的存储结构。
- Kafka 充分利用了磁盘的这个特性。它的存储设计非常简单,对于每个分区,它把从 Producer 收到的消息,顺序地写入对应的 log 文件中,一个文件写满了,就开启一个新的文件这样顺序写下去。消费的时候,也是从某个全局的位置开始,也就是某一个 log 文件中的某个位置开始,顺序地把消息读出来。
- 利用操作系统的 PageCache 来缓存数据,减少 IO 并提升读性能。
- 无论我们使用什么语言编写的程序,在调用系统的 API 读写文件的时候,并不会直接去读写磁盘上的文件,应用程序实际操作的都是 PageCache,也就是文件在内存中缓存的副本。
- 一种是 PageCache 中有数据,那就直接读取,这样就节省了从磁盘上读取数据的时间;
- 另一种情况是,PageCache 中没有数据,这时候操作系统会引发一个缺页中断,应用程序的读取线程会被阻塞,操作系统把数据从文件中复制到 PageCache 中,然后应用程序再从 PageCache 中继续把数据读出来,这时会真正读一次磁盘上的文件,这个读的过程就会比较慢。
- 应用程序在写入文件的时候,操作系统会先把数据写入到内存中的 PageCache,然后再一批一批地写到磁盘上。
- 无论我们使用什么语言编写的程序,在调用系统的 API 读写文件的时候,并不会直接去读写磁盘上的文件,应用程序实际操作的都是 PageCache,也就是文件在内存中缓存的副本。
- 使用零拷贝技术加速消费流程
- 通常在服务器上进行“读取文件,并通过网络发送出去”,要经过下面步骤:
- 从文件复制数据到 PageCache 中(如果命中 PageCache,这一步可以省掉)
- 从 PageCache 复制到应用程序的内存空间中(也就是我们可以操作的对象所在的内存)
- 从应用程序的内存空间复制到 Socket 的缓冲区(这就是调用网络 API 发送数据的过程)
- 零拷贝就是:上面的 2、3 步骤两次复制合并成一次复制。
- 直接从 PageCache 中把数据复制到 Socket 缓冲区中,这样不仅减少一次数据复制,更重要的是,由于不用把数据复制到用户内存空间,DMA 控制器可以直接完成数据复制,不需要 CPU 参与,速度更快。
- 通常在服务器上进行“读取文件,并通过网络发送出去”,要经过下面步骤:
零拷贝 API:
#include <sys/socket.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);
6,如何正确使用锁
使用锁的第一条原则:如果能不用锁,就不用锁。
只有在并发环境中,共享资源不支持并发访问,或者说并发访问共享资源会导致系统错误的情况下,才需要使用锁。
在 Java 中使用锁的示例:
// 使用锁
private Lock lock = new ReentrantLock();
public void visitShareResWithLock() {
lock.lock();
try {
// 在这里安全的访问共享资源
} finally {
lock.unlock();
}
}
// 使用同步代码块
private Object lock = new Object();
public void visitShareResWithLock() {
synchronized (lock) {
// 在这里安全的访问共享资源
}
}
使用读写锁提高性能,Java 示例:
ReadWriteLock rwlock = new ReentrantReadWriteLock();
public void read() {
rwlock.readLock().lock();
try {
// 在这儿读取共享数据
} finally {
rwlock.readLock().unlock();
}
}
public void write() {
rwlock.writeLock().lock();
try {
// 在这儿更新共享数据
} finally {
rwlock.writeLock().unlock();
}
}
读写锁的特点:
- 读访问可以并发执行。
- 写的同时不能并发读,也不能并发写。
7,使用硬件同步原语替代锁
硬件同步原语是由计算机硬件提供的一组原子操作,比较常用的原语主要是:
- CAS(Compare and Swap):意思是先比较,再交换
- FAA(Fetch and Add):
7,实现一个 RPC 框架
https://github.com/liyue2008/simple-rpc-framework
文章作者 @码农加油站
上次更改 2022-01-18