公号:码农充电站pro

主页:https://codeshellme.github.io

1,消息队列的适用场景

消息队列适合处理的问题:

  • 异步处理:让系统可以快速的响应用户
  • 流量控制:避免过多的请求压垮系统
  • 服务解耦

2,流行的消息队列产品

流行的消息队列产品:

  • RabbitMQ:是由 Erlang 语言编写的,是一个相当轻量级的消息队列,非常容易部署和使用。RabbitMQ 的客户端支持的编程语言大概是所有消息队列中最多的。
    • 它的缺点 1:对消息堆积的支持并不好,当大量消息积压的时候,会导致 RabbitMQ 的性能急剧下降。
    • 它的缺点 2:RabbitMQ 的性能是我们介绍的这几个消息队列中最差的。
  • RocketMQ:它是阿里巴巴在 2012 年开源的消息队列产品,由 Java 语言开发,后来捐赠给 Apache 软件基金会,2017 成为 Apache 的顶级项目。
    • RocketMQ 的性能比 RabbitMQ 要高一个数量级,每秒钟大概能处理几十万条消息。
  • Kafka:它使用 Scala 和 Java 语言开发,也是 Apache 的顶级项目,能够处理海量数据。
    • 当下的 Kafka 已经发展为一个非常成熟的消息队列产品,无论在数据可靠性、稳定性和功能特性等方面都可以满足绝大多数场景的需求。
    • Kafka 与周边生态系统的兼容性是最好的没有之一,尤其在大数据和流计算领域,几乎所有的相关开源软件系统都会优先支持 Kafka。

3,主题与队列的区别

队列最原始的定义是一种先进先出的数据结构。

在这里插入图片描述

主题用在发布订阅模型中,主题与队列最大的不同是:一份消息数据能不能被消费多次

在这里插入图片描述

一个主题支持多个订阅者订阅,一份消息可以被消费多次。

现代的消息队列产品使用的消息模型大多是这种发布 - 订阅模型,当然也有例外。

1,RabbitMQ 的消息模型

RabbitMQ 是少数依然坚持使用队列模型的产品之一。

在这里插入图片描述

如果 RabbitMQ 想达到一份消息可被多个消费者消费的目的,需要对 Exchange 进行配置,将一份消息重复放入多个队列中,可参考这里

2,RocketMQ 的消息模型

RocketMQ 使用的消息模型是发布 - 订阅模型。

在这里插入图片描述

一般一个消费者会对应一个队列。

RocketMQ 中的每个主题包含多个队列,通过多个队列来实现多实例并行生产和消费。需要注意的是,RocketMQ 只在队列上保证消息的有序性,主题层面是无法保证消息的严格顺序的。

RocketMQ 中,订阅者的概念是通过消费组(Consumer Group)来体现的。

  • 每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,一条消息被 Consumer Group1 消费过,也会再给 Consumer Group2 消费。
  • 消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。如果一条消息被消费者 Consumer1 消费了,那同组的其他消费者就不会再收到这条消息。
  • 在 Topic 的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要 RocketMQ 为每个消费组在每个队列上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。这个消费位置是非常重要的概念,我们在使用消息队列的时候,丢消息的原因大多是由于消费位置处理不当导致的。

4,Kafka 的消息模型

Kafka 的消息模型和 RocketMQ 是完全一样的,所有 RocketMQ 中对应的概念,和生产消费过程中的确认机制,都完全适用于 Kafka。唯一的区别是,在 Kafka 中,队列这个概念的名称不一样,Kafka 中对应的名称是“分区(Partition)”,含义和功能是没有任何区别的。

4,消息队列如何实现分布式事务

比较常见的分布式事务实现有 :

  • 2PC(Two-phase Commit,也叫二阶段提交)
  • TCC(Try-Confirm-Cancel)
  • 事务消息

事务消息适用的场景主要是那些需要异步更新数据,并且对数据实时性要求不太高的场景。

事务消息需要消息队列提供相应的功能才能实现,Kafka 和 RocketMQ 都提供了事务相关功能

以订单和购物车这个例子(订单系统要往订单库和消息队列发布消息,要保证消息的一致性,即要么都发送成功,要么都发送失败,不能仅有一个发送成功),来看下如何用消息队列来实现分布式事务。

在这里插入图片描述

  • 订单系统在消息队列上开启一个事务。
  • 订单系统给消息服务器发送一个“半消息”
    • 这个半消息不是说消息内容不完整,它包含的内容就是完整的消息内容,半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。
  • 半消息发送成功后,订单系统就可以执行本地事务了,在订单库中创建一条订单记录,并提交订单库的数据库事务。
  • 根据本地事务的执行结果决定提交或者回滚事务消息。
    • 如果订单创建成功,那就提交事务消息,购物车系统就可以消费到这条消息继续后续的流程。
    • 如果订单创建失败,那就回滚事务消息,购物车系统就不会收到这条消息。

如果在第四步提交事务消息时失败了怎么办?对于这个问题,Kafka 和 RocketMQ 给出了 2 种不同的解决方案。Kafka 的解决方案比较简单粗暴,直接抛出异常,让用户自行处理。

我们可以在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单。RocketMQ 则给出了另外一种解决方案。

RocketMQ 中的分布式事务实现

在这里插入图片描述

RocketMQ 中增加了事务反查机制来解决事务消息提交失败的问题。

如果 Producer 在提交或者回滚事务消息时发生网络异常,RocketMQ 的 Broker 没有收到提交或者回滚的请求,Broker 会定期去 Producer 上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务

为了支撑这个事务反查机制,我们的业务代码需要实现一个反查本地事务状态的接口,告知 RocketMQ 本地事务是成功还是失败。

在我们这个例子中,只要根据消息中的订单 ID,在订单库中查询这个订单是否存在即可,如果订单存在则返回成功,否则返回失败。RocketMQ 会自动根据事务反查的结果提交或者回滚事务消息。

5,使用消息队列的三个问题

1,如何确保消息不丢失

一条消息从生产到消费完成这个过程,可以划分三个阶段,只有保证消息在这三个阶段不丢失,那么消息就不会丢失。

在这里插入图片描述

生产阶段,消息队列通过请求确认机制,来保证消息的可靠传递。只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。

以 Kafka 为例,来看下正确发送消息的代码:

// 同步发送消息
try {
    RecordMetadata metadata = producer.send(record).get();
    System.out.println("消息发送成功。");
} catch (Throwable e) {
    System.out.println("消息发送失败!");
    System.out.println(e);
}

// 异步发送消息
producer.send(record, (metadata, exception) -> {
    if (metadata != null) {
        System.out.println("消息发送成功。");
    } else {
        System.out.println("消息发送失败!");
        System.out.println(exception);
    }
});

在消息存储阶段:

  • 对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应
  • 对于多个节点组成的集群,需要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。

在消费阶段,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。

2,如何处理重复消息

在消息传递过程中,如果出现传递失败的情况,发送方会执行重试,重试的过程中就有可能会产生重复的消息。而消息队列本身很难保证消息不重复,所以一般需要在业务代码中处理重复消息。

一般解决重复消息的办法是,在消费端,让消费消息的操作具备幂等性其任意多次执行所产生的影响均与一次执行的影响相同

比如,“将账户 X 的余额设置为 100 元”这个操作就是幂等的,“将账户 X 的余额加 100 元”就不是幂等的。可以这样改造:“如果账户 X 当前的余额为 500 元,将余额加 100 元”。

3,如何处理消息积压问题

消息积压的直接原因,一定是系统中的某个部分出现了性能问题,来不及处理上游发送的消息,才会导致消息积压。

如何避免消息积压

处理消息积压的一种方法是避免消息积压,即对消息队列的优化。

  • 发送端优化:需要设置合适的并发批量大小,就可以达到很好的发送性能
  • 消费端优化:只有保证消费端的消费性能高于生产端的发送性能,这样的系统才能健康的持续运行。
    • 通过水平扩容,增加消费端的并发数来提升总体的消费性能。
    • 在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。
    • 如果 Consumer 的实例数量超过分区数量,这样的扩容实际上是没有效果的,因为对于消费者来说,在每个分区上实际上只能支持单线程消费

消息积压了该如何处理?

不管如何处理,消息积压还是有可能发生的。

导致积压突然增加的原因,只有两种:要么是发送变快了,要么是消费变慢了

快速解决积压的方法就是通过水平扩容增加 Consumer 的实例数量。

6,如何实现高性能

1,异步设计

将同步操作优化成异步操作:

  • 同步实现的方式,整个服务器的所有线程大部分时间都没有在工作,而是都在等待。
  • 异步的方式大量使用了回调函数
    • 当要执行一项比较耗时的操作时,不去等待操作结束,而是给这个操作一个命令:“当操作完成后,接下来去执行什么
    • 异步方式可以减少/避免线程等待,只用很少的线程就可以达到超高的吞吐能力。
    • 相比于同步实现,异步实现的复杂度要大很多,代码的可读性和可维护性都会显著的下降。

例如下面的同步代码(一个转账函数),优化成异步代码:

// 同步代码
Transfer(accountFrom, accountTo, amount) {
  // 先从accountFrom的账户中减去相应的钱数
  Add(accountFrom, -1 * amount)
  // 再把减去的钱数加到accountTo的账户中
  Add(accountTo, amount)
  return OK
}

// 异步代码,涉及到回调方法
TransferAsync(accountFrom, accountTo, amount, OnComplete()) {
  // 异步从accountFrom的账户中减去相应的钱数,然后调用OnDebit方法。
  AddAsync(accountFrom, -1 * amount, OnDebit(accountTo, amount, OnAllDone(OnComplete())))
}
// 扣减账户accountFrom完成后调用
OnDebit(accountTo, amount, OnAllDone(OnComplete())) {
  //  再异步把减去的钱数加到accountTo的账户中,然后执行OnAllDone方法
  AddAsync(accountTo, amount, OnAllDone(OnComplete()))
}
// 转入账户accountTo完成后调用
OnAllDone(OnComplete()) {
  OnComplete()
}

Java 中比较常用的异步框架有:

2,异步(非阻塞)网络传输

一个 TCP 连接建立后,用户代码会获得一个用于收发数据的通道,每个通道会在内存中开辟两片区域用于收发数据的缓存。

  • 对于数据发送:用户代码在发送时写入的数据会暂存在缓存中,然后操作系统会通过网卡,把发送缓存中的数据传输到对端的服务器上。
    • 只要这个缓存不满,或者,发送数据的速度没有超过网卡传输速度的上限,那这个发送数据的操作耗时,只不过是一次内存写入的时间,这个时间是非常快的。所以,发送数据的时候同步发送就可以了,没有必要异步
  • 对于数据接收:因为不知道什么时候数据才会到来,如果使用同步的方法,就会造成阻塞,因此需要使用异步的方式。

在 Java 中,大名鼎鼎的 Netty 就是一个异步网络框架。

Java NIO 是一个更加底层的异步网络框架,Netty 也是基于它的。

关于JAVA的网络,有个比喻形式的总结:有一个养鸡的农场,里面养着来自各个农户(Thread)的鸡(Socket),每家农户都在农场中建立了自己的鸡舍(SocketChannel)

  • 1、BIO:Block IO,每个农户盯着自己的鸡舍,一旦有鸡下蛋,就去做捡蛋处理
  • 2、NIO:No-Block IO-单Selector,农户们花钱请了一个饲养员(Selector),并告诉饲养员(register)如果哪家的鸡有任何情况(下蛋)均要向这家农户报告(select keys)
  • 3、NIO:No-Block IO-多Selector,当农场中的鸡舍逐渐增多时,一个饲养员巡视(轮询)一次所需时间就会不断地加长,这样农户知道自己家的鸡有下蛋的情况就会发生较大的延迟。怎么解决呢?没错,多请几个饲养员(多Selector),每个饲养员分配管理鸡舍,这样就可以减轻一个饲养员的工作量,同时农户们可以更快的知晓自己家的鸡是否下蛋了
  • 4、Epoll模式:如果采用Epoll方式,农场问题应该如何改进呢?其实就是饲养员不需要再巡视鸡舍,而是听到哪间鸡舍的鸡打鸣了(活跃连接),就知道哪家农户的鸡下蛋了;
  • 5、AIO:Asynchronous I/O, 鸡下蛋后,以前的NIO方式要求饲养员通知农户去取蛋,AIO模式出现以后,事情变得更加简单了,取蛋工作由饲养员自己负责,然后取完后,直接通知农户来拿即可,而不需要农户自己到鸡舍去取蛋。

3,序列化与反序列化

要想使用网络框架的 API 来传输结构化的数据,必须得先实现结构化的数据与字节流之间的双向转换。这种将结构化数据转换成字节流的过程,我们称为序列化,反过来转换,就是反序列化。

有很多通用的序列化实现:

  • Java 和 Go 语言都内置了序列化实现
  • 一些流行的开源序列化实现,比如,Google 的 Protobuf、Kryo、Hessian 等
  • 此外,像 JSON、XML 这些标准的数据格式,也可以作为一种序列化实现来使用

4,如何更好的进行内存管理

现代的编程语言(比如 Java,Go 等),大多采用自动内存管理机制,虚拟机会不定期执行垃圾回收,自动释放不再使用的内存,但是执行垃圾回收的过程会导致进程暂停

在高并发的场景下,会产生大量的待回收的对象,需要频繁地执行垃圾回收,导致程序长时间暂停,程序看起来就像卡死了一样。

为了缓解这个问题,我们需要尽量少地使用一次性对象,对于需要频繁使用,占用内存较大的一次性对象,可以考虑自行回收并重用这些对象,来减轻垃圾回收的压力。

5,Kafka 如何实现高性能

除了上面几节介绍到的一些通用的性能优化方法,Kafka 还使用了下面几个关键的技术点:

  • 使用批量处理的方式来提升系统吞吐能力。
    • 在客户端,当调用 send() 方法发送一条消息之后,无论你是同步发送还是异步发送,Kafka 都不会立即就把这条消息发送出去。它会先把这条消息,存放在内存中缓存起来,然后选择合适的时机把缓存中的所有消息组成一批,一次性发给 Broker。
    • 在服务端,Kafka 不会把一批消息再还原成多条消息,再一条一条地处理(这样太慢了)。Kafka 会把每个批消息当做一个“批消息”来处理。在 Broker 整个处理流程中,无论是写入磁盘、从磁盘读出来、还是复制到其他副本这些流程中,批消息都不会被解开,一直是作为一条“批消息”来进行处理。
    • 在消费时,消息同样是以批为单位进行传递的,Consumer 从 Broker 拉到一批消息后,在客户端把批消息解开,再一条一条交给用户代码处理。
    • 构建批消息和解开批消息分别在发送端和消费端的客户端完成,不仅减轻了 Broker 的压力,最重要的是减少了 Broker 处理请求的次数,提升了总体的处理能力
  • 基于磁盘文件高性能顺序读写的特性来设计的存储结构。
    • Kafka 充分利用了磁盘的这个特性。它的存储设计非常简单,对于每个分区,它把从 Producer 收到的消息,顺序地写入对应的 log 文件中,一个文件写满了,就开启一个新的文件这样顺序写下去。消费的时候,也是从某个全局的位置开始,也就是某一个 log 文件中的某个位置开始,顺序地把消息读出来。
  • 利用操作系统的 PageCache 来缓存数据,减少 IO 并提升读性能。
    • 无论我们使用什么语言编写的程序,在调用系统的 API 读写文件的时候,并不会直接去读写磁盘上的文件,应用程序实际操作的都是 PageCache,也就是文件在内存中缓存的副本。
      • 一种是 PageCache 中有数据,那就直接读取,这样就节省了从磁盘上读取数据的时间;
      • 另一种情况是,PageCache 中没有数据,这时候操作系统会引发一个缺页中断,应用程序的读取线程会被阻塞,操作系统把数据从文件中复制到 PageCache 中,然后应用程序再从 PageCache 中继续把数据读出来,这时会真正读一次磁盘上的文件,这个读的过程就会比较慢。
    • 应用程序在写入文件的时候,操作系统会先把数据写入到内存中的 PageCache,然后再一批一批地写到磁盘上。
  • 使用零拷贝技术加速消费流程
    • 通常在服务器上进行“读取文件,并通过网络发送出去”,要经过下面步骤:
      • 从文件复制数据到 PageCache 中(如果命中 PageCache,这一步可以省掉)
      • 从 PageCache 复制到应用程序的内存空间中(也就是我们可以操作的对象所在的内存)
      • 从应用程序的内存空间复制到 Socket 的缓冲区(这就是调用网络 API 发送数据的过程)
    • 零拷贝就是:上面的 2、3 步骤两次复制合并成一次复制。
      • 直接从 PageCache 中把数据复制到 Socket 缓冲区中,这样不仅减少一次数据复制,更重要的是,由于不用把数据复制到用户内存空间,DMA 控制器可以直接完成数据复制,不需要 CPU 参与,速度更快。

零拷贝 API:

#include <sys/socket.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

6,如何正确使用锁

使用锁的第一条原则:如果能不用锁,就不用锁

只有在并发环境中,共享资源不支持并发访问,或者说并发访问共享资源会导致系统错误的情况下,才需要使用锁。

在 Java 中使用锁的示例:

// 使用锁
private Lock lock = new ReentrantLock();

public void visitShareResWithLock() {
  lock.lock();
  try {
    // 在这里安全的访问共享资源
  } finally {
    lock.unlock();
  }
}

// 使用同步代码块
private Object lock = new Object();

public void visitShareResWithLock() {
  synchronized (lock) {
    // 在这里安全的访问共享资源
  }
}

使用读写锁提高性能,Java 示例:

ReadWriteLock rwlock = new ReentrantReadWriteLock();

public void read() {
  rwlock.readLock().lock();
  try {
    // 在这儿读取共享数据
  } finally {
    rwlock.readLock().unlock();
  }
}

public void write() {
  rwlock.writeLock().lock();
  try {
    // 在这儿更新共享数据
  } finally {
    rwlock.writeLock().unlock();
  }
}

读写锁的特点:

  • 读访问可以并发执行。
  • 写的同时不能并发读,也不能并发写。

7,使用硬件同步原语替代锁

硬件同步原语是由计算机硬件提供的一组原子操作,比较常用的原语主要是:

  • CAS(Compare and Swap):意思是先比较,再交换
  • FAA(Fetch and Add):

7,实现一个 RPC 框架

https://github.com/liyue2008/simple-rpc-framework