公号:码农充电站pro

主页:https://codeshellme.github.io

这 3 篇文章是我在学习 Redis 的过程中,总结的笔记:

7,Redis 中如何进行原子操作

7.1,原子操作要解决的问题

原子操作是一种提供并发访问控制的方法(另一种是锁)。原子操作是指执行过程保持原子性的操作,而且原子操作执行时并不需要再加锁,实现了无锁操作。这样一来,既能保证并发控制,还能减少对系统并发性能的影响。

原子操作,是指执行多个写命令操作时,这些命令操作要么全部完成,要么都不完成。

当有多个客户端对同一份数据执行 “读取 - 修改 - 写回” 时,就需要用到原子操作,比如下面伪代码:

# 如果对临界区代码的执行没有并发控制机制,就会出现数据更新错误
current = GET(id) # 从 Redis 中获取数据
current--		  # 修改数据
SET(id, current)  # 写回数据

解决该问题的一种方法是使用锁:

LOCK()				# 获取分布式锁
current = GET(id)
current--
SET(id, current)
UNLOCK()			# 释放分布式锁

加锁的缺点是会导致系统并发性能降低。原子操作也能实现并发控制,但是原子操作对系统并发性能的影响较小。

7.2,Redis 中的原子操作

Redis 中有两种方式可实现原子操作:

  • 把多个操作在 Redis 中实现成一个操作,也就是单命令操作
    • Redis 是使用单线程来串行处理客户端的请求操作命令的,当 Redis 执行某个命令操作时,其他命令是无法执行的,这相当于命令操作是互斥执行的
    • Redis 提供了 INCR/DECR 单命令操作,把“读取-修改-写回”转变为一个原子操作了
    • INCR/DECR 命令可以对数据进行增值 / 减值操作
    • 如果要执行的操作不是简单地增减数据,而是有更加复杂的判断逻辑或者是其他操作,那么单命令操作就不适用了。这时可以使用 Lua 脚本的方式
  • 把多个操作写到一个 Lua 脚本中,以原子性方式执行单个 Lua 脚本
    • Redis 把整个 Lua 脚本作为一个整体执行,在执行过程中不会被其他命令打断,从而保证了 Lua 脚本中操作的原子性
    • 就是把多个操作编写到一个 Lua 脚本中,然后用 EVAL 命令来执行脚本
    • 建议:在编写 Lua 脚本时,不要把不需要做并发控制的操作写入脚本中,以免降低 Redis 的并发性能

INCR/DECR 单命令操作示例:

# 该命令直接完成对商品 id 的库存值减 1 操作
# 即使有多个客户端执行下面的代码,也不用担心出现库存值扣减错误的问题
DECR id

LUA 脚本示例,文件名为 lua.script

local current
current = redis.call("incr",KEYS[1])
if tonumber(current) == 1 then
    redis.call("expire",KEYS[1],60)
end

接下来使用 redis-cli,带上 eval 选项,来执行该脚本。脚本所需的参数将通过 keys 和 args 进行传递。

redis-cli  --eval lua.script  keys , args

7.3,Redis 中的事务操作

Redis 中使用 MULTIEXEC 命令来实现简单的事务 。当多个命令及其参数本身无误时,MULTI 和 EXEC 命令可以保证多个命令的原子性。

  • MULTI:表示一系列原子性操作的开始
    • Redis 收到这个命令后,接下来再收到的命令会放到一个内部队列中,后续一起执行,从而保证原子性
  • EXEC:表示一系列原子性操作的结束
    • Redis 收到这个命令后,就表示所有要保证原子性的命令操作都已经发送完成了。此时,Redis 开始执行刚才放到内部队列中的所有命令操作。
  • DISCARD :是 EXEC 命令的反操作,表示放弃事务执行
    • Redis 并不支持事务的回滚操作

如下图所示:

在这里插入图片描述

示例:

127.0.0.1:6379> MULTI
OK

127.0.0.1:6379> HSET device:temperature 202008030911 26.8
QUEUED # 表示命令暂时入队,先不执行

127.0.0.1:6379> ZADD device:temperature 202008030911 26.8
QUEUED # 表示命令暂时入队,先不执行

127.0.0.1:6379> EXEC # 真正的执行命令
1) (integer) 1
2) (integer) 1

Redis 是否支持事务的所有特征

事务有四大特征,即 ACID 属性,Redis 对其的支持情况如下:

  • 原子性(Atomicity):如果事务正常执行,没有发生任何错误,那么 MULTI 和 EXEC 配合,就可以保证多个操作都完成。但是,如果事务执行发生错误了,原子性还能保证吗?
    • 第一种情况:在执行 EXEC 命令前,客户端发送的操作命令本身就有错误(比如语法错误,使用了不存在的命令),在命令入队时就被 Redis 实例判断出来了。此时在执行 EXEC 命令后,Redis 就会拒绝执行所有提交的命令操作,从而保证了原子性
    • 第二种情况:事务操作入队时,命令和操作的数据类型不匹配,但 Redis 实例没有检查出错误。在执行完 EXEC 命令以后,Redis 实际执行这些事务操作时,就会报错。但是,虽然 Redis 会对错误命令报错,但还是会把正确的命令执行完。在这种情况下,事务的原子性就无法得到保证了
    • 第三种情况:在执行事务的 EXEC 命令时,Redis 实例发生了故障,导致事务执行失败。这种情况下,如果 Redis 开启了 AOF 日志,那么只会有部分的事务操作被记录到 AOF 日志中。我们需要使用 redis-check-aof 工具检查 AOF 日志文件,这个工具可以把未完成的事务操作从 AOF 文件中去除。这样一来,我们使用 AOF 恢复实例后,事务操作不会再被执行,从而保证了原子性。如果 AOF 日志并没有开启,那么实例重启后,数据也都没法恢复了,此时,也就谈不上原子性了。
  • 一致性(Consistency):在命令执行错误或 Redis 发生故障的情况下,Redis 事务机制对一致性属性是有保证的。
  • 隔离性(Isolation):
    • 并发操作在 EXEC 命令前执行,此时,隔离性的保证要使用 WATCH 机制来实现,否则隔离性无法保证;
    • 并发操作在 EXEC 命令后执行,此时,隔离性可以保证。
  • 持久性(Durability)
    • 如果 Redis 没有使用 RDB 或 AOF,那么事务的持久化属性肯定得不到保证
    • 如果 Redis 使用了 RDB 模式,那么,在一个事务执行后,而下一次的 RDB 快照还未执行前,如果发生了实例宕机,这种情况下,事务修改的数据也是不能保证持久化的。
    • 如果 Redis 采用了 AOF 模式,因为 AOF 模式的三种配置选项 no、everysec 和 always 都会存在数据丢失的情况,所以,事务的持久性属性也还是得不到保证

Redis 中的 WATCH 机制

一个事务的 EXEC 命令还没有执行时,事务的命令操作是暂存在命令队列中的。此时,如果有其它的并发操作,需要看事务是否使用了 WATCH 机制。

WATCH 机制的作用是,在事务执行前,监控一个或多个键的值变化情况,当事务调用 EXEC 命令执行时,WATCH 机制会先检查监控的键是否被其它客户端修改了。如果修改了,就放弃事务执行,避免事务的隔离性被破坏。然后,客户端可以再次执行事务,此时,如果没有并发修改事务数据的操作了,事务就能正常执行,隔离性也得到了保证。

下面是 WATCH 命令的使用:

在这里插入图片描述

在 t4 时,实例收到客户端 X 发送的 EXEC 命令,但是,实例的 WATCH 机制发现 a:stock 已经被修改了,就会放弃事务执行。这样,事务的隔离性就可以得到保证了。

在这里插入图片描述

8,Redis 是否适合用作消息队列

一个消息队列的一般架构:

在这里插入图片描述

消息队列的三个责任:

  • 消息保序
  • 处理重复消息
  • 保证消息可靠性:不丢消息,主要针对以下情况:
    • 当消费者已经读取了消息,在消息还没有处理完的时候,消费者宕机了
    • 这时候,要保证消费者重启后,还能读到那个没有处理完的消息

Redis 中的 ListStreams 两种数据类型,就可以满足消息队列的这三个需求。

8.1,List 类型

List 对消息队列的支持:

  • 消息保序:支持
  • 处理重复消息:不支持;有两种解决方案:
    • 生产者给每一个消息提供全局唯一的 ID 号;消费者把处理过的消息的 ID 号记录下来
    • 利用幂等性原理:对于同一条消息,消费者收到一次的处理结果和收到多次的处理结果是一致的,这样就不怕消息重复了
  • 保证消息可靠性:不支持
    • 消费者读取消息后,List 中的消息就没有了,如果消息没有被处理完毕,那么消息就丢失了
    • 针对这种情况,Redis 的 List 提供了以下解决方案:
    • List 类型提供了 BRPOPLPUSH 命令,该命令可以让消费者从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存。
    • 这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了
    • 在这里插入图片描述

List 类型的常用命令:

  • LPUSH:写入数据
  • RPOP:非阻塞读取
  • BRPOP:阻塞读取
  • BRPOPLPUSH

8.2,发布订阅模型

Redis 中的 Pub/Sub 模型支持多生产者-多消费者的场景。

List 其实是属于模型,而 Pub/Sub 其实属于模型。

Pub/Sub 在实现时非常简单,它没有基于任何数据类型,也没有做任何的数据存储,它只是单纯地为生产者、消费者建立数据转发通道(内存),把符合规则的数据,从一端转发到另一端。

在这里插入图片描述 Redis 中的 Pub/Sub 的特点:

  • 支持多生产者-多消费者的场景
  • 在以下场景下会丢数据:
    • 消费者下线:
      • 如果一个消费者异常挂掉了,它再重新上线后,只能接收新的消息,在下线期间生产者发布的消息,因为找不到消费者,都会被丢弃掉(不会进行存储)
      • 如果所有消费者都下线了,那生产者发布的消息,因为找不到任何一个消费者,也会全部丢弃
      • 所以,在使用 Pub/Sub 时,一定要注意:消费者必须先订阅队列,生产者才能发布消息,否则消息会丢失
    • Redis 宕机:Pub/Sub 的相关操作的数据并没有进行持久化
    • 消息堆积:当消费者的速度,跟不上生产者时,就会导致数据积压的情况发生
      • 每个消费者订阅一个队列时,Redis 都会在 Server 上给这个消费者在分配一个缓冲区,这个缓冲区就是一块内存。
      • 这个缓冲区是有上限的(可配置),当超过了缓冲区配置的上限,Redis 就会把这个消费者踢下线,这时消费者就会消费失败,从而丢失数据。
      • 缓冲区配置示例:client-output-buffer-limit pubsub 32mb 8mb 60

相关命令:

  • SUBSCRIBE :订阅消息(队列)
  • PUBLISH :向队列中发布消息

8.3,Streams 类型

Redis 从 5.0 版本开始提供 Streams 数据类型,和 List 相比,Streams 同样能够满足消息队列的三大需求。而且,它还支持消费组形式的消息读取。

Stream 是新增加的数据类型,它与其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中,我们只需要配置好持久化策略。

List 对消息队列的支持:

  • 消息保序:支持
  • 处理重复消息:支持
  • 保证消息可靠性:支持

Streams 是 Redis 专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令:

  • XADD:插入消息,保证有序,可以自动生成全局唯一 ID
    • 消息的格式是键 - 值对形式
  • XREAD:用于读取消息,可以按 ID 读取数据
  • XREADGROUP:按消费组形式读取消息
  • XPENDINGXACK
    • XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息
    • XACK 命令用于向消息队列确认消息处理已完成
      • 如果消费者没有发送 XACK 命令,消息仍然会留存(防止消息丢失)。消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。

Stream 可以指定队列最大长度,为了避免数据量太多导致内存崩溃。

XADD 命令的使用示例:

XADD mqstream * repo 5
"1599203861727-0"  # 返回全局唯一 ID
# 第一部分“1599203861727”是数据插入时,以毫秒为单位的时间
# 第二部分表示插入消息在当前毫秒内的消息序号,这是从 0 开始编号的

上面的命令,往名称为 mqstream 的消息队列中插入一条消息,消息的键是 repo,值是 5。

消息队列名称后面的 *,表示让 Redis 为插入的数据自动生成一个全局唯一的 ID,例如“1599203861727-0”。我们也可以不用 *,直接在消息队列名称后自行设定一个 ID 号,只要保证这个 ID 号是全局唯一的就行。相比自行设定 ID 号,使用 * 会更加方便高效。

XREAD 在读取消息时,可以指定一个消息 ID,并从这个消息 ID 的下一条消息开始进行读取。

例如,下面的命令,从 ID 号为 1599203861727-0 的消息开始,读取后续的所有消息(示例中一共 3 条)。

XREAD BLOCK 100 STREAMS  mqstream 1599203861727-0
1) 1) "mqstream"
   2) 1) 1) "1599274912765-0"
         2) 1) "repo"
            2) "3"
      2) 1) "1599274925823-0"
         2) 1) "repo"
            2) "2"
      3) 1) "1599274927910-0"
         2) 1) "repo"
            2) "1"

在调用 XRAED 时还可以设定 block 配置项,实现阻塞读取。当消息队列中没有消息时,一旦设置了 block 配置项,XREAD 就会阻塞,阻塞的时长可以在 block 配置项进行设置。

下面的示例命令:

XREAD block 10000 streams mqstream $
(nil)
(10.00s)

命令最后的$符号表示读取最新的消息;同时设置了 block 10000 ,10000 的单位是毫秒,表明 XREAD 在读取最新消息时,如果没有消息到来,XREAD 将阻塞 10000 毫秒再返回。上面命令中的 XREAD 执行后,消息队列中一直没有消息,所以,XREAD 在 10 秒后返回空值(nil)。

Streams 可以使用 XGROUP 创建消费组,之后,可以使用 XREADGROUP 命令让消费组内的消费者读取消息

例如,下面的命令,创建一个名为 group1 的消费组,这个消费组消费的消息队列是 mqstream。

XGROUP create mqstream group1 0
OK

然后执行下面命令:

XREADGROUP group group1 consumer1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599203861727-0"
         2) 1) "repo"
            2) "5"
      2) 1) "1599274912765-0"
         2) 1) "repo"
            2) "3"
      3) 1) "1599274925823-0"
         2) 1) "repo"
            2) "2"
      4) 1) "1599274927910-0"
         2) 1) "repo"
            2) "1"

该命令让 group1 消费组里的消费者 consumer1 从 mqstream 中读取所有消息。命令最后的参数 >,表示从第一条尚未被消费的消息开始读取。

因为在 consumer1 读取消息前,group1 中没有其他消费者读取过消息,所以,consumer1 就得到 mqstream 消息队列中的所有消息了(一共 4 条)。

消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了。

比如,再执行下面的命令,让 group1 内的 consumer2 读取消息时,consumer2 读到的就是空值,因为消息已经被 consumer1 读取完了:

XREADGROUP group group1 consumer2  streams mqstream 0
1) 1) "mqstream"
   2) (empty list or set)

使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。

例如下面命令,让 group2 中的 consumer1、2、3 各自读取一条消息。

XREADGROUP group group2 consumer1 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599203861727-0"
         2) 1) "repo"
            2) "5"

XREADGROUP group group2 consumer2 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599274912765-0"
         2) 1) "repo"
            2) "3"

XREADGROUP group group2 consumer3 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599274925823-0"
         2) 1) "repo"
            2) "2"

用 XPENDING 命令查看 group2 中各个消费者已读取、但尚未确认的消息个数。其中,XPENDING 返回结果的第二、三行分别表示 group2 中所有消费者读取的消息最小 ID 和最大 ID。

XPENDING mqstream group2
1) (integer) 3
2) "1599203861727-0"
3) "1599274925823-0"
4) 1) 1) "consumer1"
      2) "1"
   2) 1) "consumer2"
      2) "1"
   3) 1) "consumer3"
      2) "1"

查看某个消费者具体读取了哪些数据:

XPENDING mqstream group2 - + 10 consumer2
1) 1) "1599274912765-0"
   2) "consumer2"
   3) (integer) 513336
   4) (integer) 1

可以看到,consumer2 已读取的消息的 ID 是 1599274912765-0。

使用 XACK 命令通知 Streams:

XACK mqstream group2 1599274912765-0
(integer) 1
XPENDING mqstream group2 - + 10 consumer2
(empty list or set)

8.4,总结

把 Redis 当作队列来使用时,始终面临的 2 个问题:

  • Redis 本身可能会丢数据
  • 面对消息积压,Redis 内存资源紧张

在这里插入图片描述

9,使用 Redis 实现分布式锁

在分布式系统中,当有多个客户端需要获取锁时,我们需要分布式锁。此时,锁保存在一个共享存储系统中的,可以被多个客户端共享访问和获取。

分布式锁可以用一个变量来实现:

  • 加锁时需要判断锁变量的值,根据锁变量值来判断能否加锁成功
    • 在分布式场景下,锁变量需要由一个共享存储系统来维护
    • 那么,加锁和释放锁的操作就变成了读取、判断和设置共享存储系统中的锁变量值
  • 释放锁时需要把锁变量值设置为 0,表明客户端不再持有锁

实现分布式锁的两个要求:

  • 要求一:分布式锁的加锁和释放锁的过程,涉及多个操作。所以,在实现分布式锁时,我们需要保证这些锁操作的原子性
  • 要求二:共享存储系统保存了锁变量,如果共享存储系统发生故障或宕机,那么客户端也就无法进行锁操作了。在实现分布式锁时,我们需要考虑保证共享存储系统的可靠性,进而保证锁的可靠性

我们既可以基于单个 Redis 节点来实现,也可以基于多个 Redis 节点实现。在这两种情况下,锁的可靠性是不一样的。

9.1,基于单个 Redis 节点实现

我们可以使用一个键值对来表示一把锁:

  • 键表示锁的名称
  • 值表示锁的状态
    • 0 表示没有加锁
    • 1 表示已加锁

加锁操作需要保证原子性,有两种方式:

  • 单命令方式
    • SETNX 命令:它用于设置键值对的值,它在执行时会判断键值对是否存在,如果不存在,就设置键值对的值,如果存在,就不做任何设置
      • SETNX key 1 (加锁)
      • DEL key(释放锁)
      • 为了防止某个客户端在获取锁后,因发生异常而永远无法释放锁,因此需要给 key 加一个过期时间
      • 这种方式还有一个问题是,当一个客户端获取了锁,如果其它客户端执行 DEL 命令,也可以释放锁,这就会产生混乱
      • 在这里插入图片描述
      • 解决办法是使用一个唯一标识,只有同一个客户端才能释放它所加的锁
      • 此时可以使用 SET 命令
    • SET lock_key unique_value NX PX 10000 (用 SET 来加锁,Redis 2.6.12 后支持)
      • unique_value 表示客户端唯一性的标识
      • NX 用来实现“不存在即设置”,表示只有在键值对不存在时,才会进行设置,否则不做赋值操作
      • EXPX 选项,用来设置键值对的过期时间,EX 的时间单位是秒,PX 的时间单位是毫秒
  • lua 脚本方式

注意:SETNX + EXPIRE 无法保证原子性,所以,SETNX 命令不能用来实现分布式锁,但是如果写在 lua 脚本中,是可以的

在使用 SET 命令加完锁后,在释放锁时,需要判断锁变量的值,是否等于执行释放锁操作的客户端的唯一标识,需要使用到 lua 脚本来保证原子性:

//释放锁 比较unique_value是否相等,避免误释放
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

然后执行下面来释放锁:

redis-cli  --eval  unlock.script lock_key , unique_value 

9.2,基于多个 Redis 节点实现

要实现高可靠的分布式锁时,就不能只依赖单个的命令操作了,我们需要按照一定的步骤和规则进行加解锁操作,否则,就可能会出现锁无法工作的情况。

“一定的步骤和规则”就是分布式锁的算法,Redis 的开发者 Antirez 提出了分布式锁算法 Redlock

Redlock 算法的基本思路,是让客户端和多个独立的 Redis 实例依次请求加锁,如果客户端能够和半数以上的实例成功地完成加锁操作,那就认为,客户端成功地获得分布式锁了,否则加锁失败。

这样一来,即使有单个 Redis 实例发生故障,因为锁变量在其它实例上也有保存,所以,客户端仍然可以正常地进行锁操作,锁变量并不会丢失。

Redlock 的方案基于 2 个前提:

  • 不再需要部署从库和哨兵实例,只部署主库
  • 但主库要部署多个,官方推荐至少 5 个实例

也就是说,想用使用 Redlock,你至少要部署 5 个 Redis 实例,而且都是主库,它们之间没有任何关系,都是一个个孤立的实例。注意:不是部署 Redis Cluster,就是部署 5 个简单的 Redis 实例

Redlock 算法的实现需要有 N 个独立的 Redis 实例,我们可以分成 3 步来完成加锁操作:

  • 客户端获取当前时间
  • 客户端按顺序依次向 N 个 Redis 实例执行加锁操作,使用 SET 命令
    • 如果客户端在和一个 Redis 实例请求加锁时,一直到超时都没有成功,那么此时,客户端会和下一个 Redis 实例继续请求加锁。
    • 加锁操作的超时时间需要远远地小于锁的有效时间,一般也就是设置为几十毫秒
  • 一旦客户端完成了和所有 Redis 实例的加锁操作,客户端就要计算整个加锁过程的总耗时

客户端只有在满足下面的这两个条件时,才能认为是加锁成功

  • 客户端从超过半数(大于等于 N/2+1)的 Redis 实例上成功获取到了锁
  • 客户端获取锁的总耗时没有超过锁的有效时间

在满足了这两个条件后,我们需要重新计算这把锁的有效时间,计算的结果是锁的最初有效时间减去客户端为获取锁的总耗时。如果锁的有效时间已经来不及完成共享数据的操作了,我们可以释放锁,以免出现还没完成数据操作,锁就过期了的情况。(注意,即使这样,如果客户端成功获取锁后,仍然无法在完成业务操作之前释放锁,这就会导致锁过期而被 Redis 释放掉锁,此时其它客户端就可以获取到锁了,那这种情况下,分布式锁已经起不到作用了

如果客户端在和所有实例执行完加锁操作后,没能同时满足这两个条件,那么,客户端向所有 Redis 节点发起释放锁的操作。

在 Redlock 算法中,释放锁的操作和在单实例上释放锁的操作一样,只要执行释放锁的 Lua 脚本就可以了。

9.3,如何确定锁的过期时间

Redisson 是一个 Java 语言实现的 Redis SDK 客户端,在使用分布式锁时,它采用了自动续期的方案来避免锁过期。

所谓的自动续期是这样的:

  • 加锁时,先设置一个过期时间,然后开启一个守护线程,定时去检测这个锁的失效时间,如果锁快要过期了,操作共享资源还未完成,那么就自动对锁进行续期,重新设置过期时间。
  • 在这里插入图片描述

Redisson 还封装了很多易用的功能:可重入锁乐观锁公平锁读写锁Redlock