分布式锁完全指南
从并发控制到分布式协调:为什么现代应用需要锁机制
December 25, 2024·12 min read·Yimin
#并发#锁#分布式#Redis#数据一致性
本文带你彻底搞懂锁的核心原理,以及为什么没有正确的锁机制,你的应用会出现各种诡异的 bug。
🎯 为什么需要锁?
真实场景:抢票系统崩了
某演唱会开票,10 秒内涌入 10 万用户:
用户 A:查询余票 → 剩余 1 张 → 点击购买 → 成功!
用户 B:查询余票 → 剩余 1 张 → 点击购买 → 成功!
用户 C:查询余票 → 剩余 1 张 → 点击购买 → 成功!
...
结果:1 张票卖出了 50 份!😱
问题根源:
# ❌ 没有锁的代码
def buy_ticket(user_id):
tickets = db.query("SELECT stock FROM tickets WHERE id=1")
if tickets.stock > 0:
# ⚠️ 这里有时间差!多个请求可能同时读到 stock > 0
db.execute("UPDATE tickets SET stock = stock - 1 WHERE id=1")
create_order(user_id)
return "购买成功"
return "已售罄"
时序图:
时间轴 →
用户 A: [读 stock=1] -------- [写 stock=0] -------- [创建订单]
用户 B: -------- [读 stock=1] -------- [写 stock=-1] -------- [创建订单]
用户 C: ------------ [读 stock=1] -------- [写 stock=-2] -------- [创建订单]
结果:超卖!数据不一致!
没有锁机制,你的应用就像没有红绿灯的十字路口。
🔍 什么是锁?
锁的本质
锁是一种同步机制,保证在同一时刻,只有一个线程/进程/机器能访问临界资源。
┌─────────────────────────────────────────────┐
│ 共享资源(数据库、文件) │
│ │
│ ┌───────────────┐ │
│ │ 临界区代码 │ ← 需要保护 │
│ └───────────────┘ │
└─────────────────────────────────────────────┘
↑ ↑ ↑
线程 A 线程 B 线程 C
(持锁) (等待) (等待)
核心目标:
- ✅ 互斥(Mutual Exclusion):同一时刻只有一个持锁者
- ✅ 可见性(Visibility):持锁者的修改对其他人可见
- ✅ 有序性(Ordering):按顺序获取锁,避免混乱
🏛️ 锁的分类体系
按乐观/悲观分类
1️⃣ 悲观锁(Pessimistic Lock)
哲学:假设一定会冲突,提前加锁。
# 数据库悲观锁
def transfer_money(from_account, to_account, amount):
with db.transaction():
# SELECT ... FOR UPDATE 会锁住这一行
from_balance = db.query(
"SELECT balance FROM accounts WHERE id = ? FOR UPDATE",
from_account
)
if from_balance >= amount:
db.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?",
amount, from_account)
db.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?",
amount, to_account)
return True
return False
特点:
- ✅ 数据强一致
- ✅ 适合写多读少场景
- ❌ 性能较低(会阻塞其他请求)
- ❌ 可能死锁
2️⃣ 乐观锁(Optimistic Lock)
哲学:假设不会冲突,提交时才检查。
# 版本号乐观锁
def update_product(product_id, new_price):
# 读取当前数据和版本号
product = db.query(
"SELECT price, version FROM products WHERE id = ?",
product_id
)
# 更新时检查版本号
rows_affected = db.execute(
"""UPDATE products
SET price = ?, version = version + 1
WHERE id = ? AND version = ?""",
new_price, product_id, product.version
)
if rows_affected == 0:
# 版本号不匹配,说明被别人改了
raise ConflictError("数据已被修改,请重试")
return True
特点:
- ✅ 性能高(不阻塞读操作)
- ✅ 适合读多写少场景
- ❌ 冲突时需要重试
- ❌ 不适合高并发写
对比表格:
| 特性 | 悲观锁 | 乐观锁 |
|---|---|---|
| 假设 | 一定冲突 | 很少冲突 |
| 加锁时机 | 读取前 | 提交时 |
| 性能 | 较低 | 较高 |
| 适用场景 | 写多 | 读多 |
| 实现方式 | SELECT FOR UPDATE | 版本号/CAS |
按锁范围分类
3️⃣ 本地锁(Local Lock)
作用域:单个进程内的线程间同步。
import threading
lock = threading.Lock()
counter = 0
def increment():
global counter
with lock: # 自动加锁和解锁
counter += 1
# 多线程安全
threads = [threading.Thread(target=increment) for _ in range(1000)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter) # 1000(正确)
特点:
- ✅ 速度极快(内存操作)
- ✅ 实现简单
- ❌ 只能保护单进程内的并发
- ❌ 不适用于分布式系统
4️⃣ 分布式锁(Distributed Lock)
作用域:多个进程/多台机器间的同步。
# Redis 分布式锁
async def process_order(order_id):
lock_key = f"lock:order:{order_id}"
lock_value = str(uuid.uuid4())
# 尝试获取锁(SET NX EX)
acquired = await redis.set(
lock_key,
lock_value,
nx=True, # 不存在时才设置
ex=30 # 30 秒后自动释放
)
if not acquired:
return "订单处理中,请稍后"
try:
# 临界区:处理订单
result = await do_process_order(order_id)
return result
finally:
# 释放锁(用 Lua 保证原子性)
await redis.eval("""
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
""", 1, lock_key, lock_value)
特点:
- ✅ 跨进程/跨机器
- ✅ 适用于分布式系统
- ❌ 实现复杂(需要考虑网络、时钟等问题)
- ❌ 性能较本地锁低
按锁粒度分类
5️⃣ 表锁 vs 行锁
┌─────────────────────────────────────────┐
│ 表锁:锁住整张表 │
│ │
│ ❌ 粒度大,并发低 │
│ ✅ 实现简单 │
│ │
│ LOCK TABLES orders WRITE; │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 行锁:只锁住特定行 │
│ │
│ ✅ 粒度小,并发高 │
│ ❌ 实现复杂 │
│ │
│ SELECT * FROM orders │
│ WHERE id = 123 FOR UPDATE; │
└─────────────────────────────────────────┘
性能对比:
并发请求数 vs 吞吐量
表锁: ▂▂▂▂▂▂▂▂▂▂ (低)
行锁: ▂▄▆█████▇▅ (高)
10 50 100 200 (并发数)
6️⃣ 读写锁(Read-Write Lock)
核心思想:读读不互斥,读写/写写互斥。
import threading
rw_lock = threading.RLock()
readers = 0
def read_data():
global readers
with rw_lock:
readers += 1
# 多个读者可以同时读
try:
data = shared_data.copy()
return data
finally:
with rw_lock:
readers -= 1
def write_data(new_data):
with rw_lock:
# 写操作独占
shared_data.update(new_data)
适用场景:
- ✅ 读多写少(如配置中心、缓存)
- ✅ 提高读并发
- ❌ 写操作仍需独占
🚨 常见问题与陷阱
1️⃣ 死锁(Deadlock)
经典场景:银行转账
# ❌ 会死锁的代码
def transfer(from_id, to_id, amount):
lock_a = locks[from_id]
lock_b = locks[to_id]
with lock_a:
with lock_b:
# 转账逻辑
pass
# 同时执行:
Thread-1: transfer(A, B, 100) # 持有 A,等待 B
Thread-2: transfer(B, A, 50) # 持有 B,等待 A
→ 死锁!
死锁四要素:
- 互斥:资源不能共享
- 持有并等待:持有锁 A,等待锁 B
- 不可剥夺:不能强制抢锁
- 循环等待:A → B → A 形成环
解决方案 1:全局顺序加锁
# ✅ 永远按 ID 顺序加锁
def transfer(from_id, to_id, amount):
first_id = min(from_id, to_id)
second_id = max(from_id, to_id)
lock_a = locks[first_id]
lock_b = locks[second_id]
with lock_a:
with lock_b:
# 转账逻辑
pass
解决方案 2:超时机制
# ✅ 加锁失败时超时返回
def transfer(from_id, to_id, amount):
lock_a = locks[from_id]
lock_b = locks[to_id]
if lock_a.acquire(timeout=5):
try:
if lock_b.acquire(timeout=5):
try:
# 转账逻辑
pass
finally:
lock_b.release()
else:
raise TimeoutError("获取锁超时")
finally:
lock_a.release()
else:
raise TimeoutError("获取锁超时")
2️⃣ 活锁(Livelock)
场景:两个人在走廊相遇
时刻 1: A 往左躲, B 往右躲 → 还是挡住
时刻 2: A 往右躲, B 往左躲 → 还是挡住
时刻 3: A 往左躲, B 往右躲 → 还是挡住
...
→ 虽然都在"工作",但没有进展
代码示例:
# ❌ 活锁:重试时机相同
def acquire_locks():
while True:
if try_lock_a() and try_lock_b():
return True
else:
release_all_locks()
time.sleep(1) # 固定间隔
# 多个线程可能同时重试
解决方案:随机退避
# ✅ 随机退避避免同步
import random
def acquire_locks():
while True:
if try_lock_a() and try_lock_b():
return True
else:
release_all_locks()
# 随机等待,避免同步重试
time.sleep(random.uniform(0.1, 1.0))
3️⃣ 锁超时导致数据不一致
场景:分布式锁过期
时间轴 →
进程 A: [获取锁] -------- [处理中...] -------- [锁过期❌] -------- [写数据库]
进程 B: -------------- [获取锁✅] -------- [写数据库] -------- [释放锁]
结果:两个进程同时持有"锁",数据被覆盖!
问题代码:
# ❌ 固定超时,可能不够用
lock_timeout = 10 # 秒
acquired = redis.set(lock_key, lock_value, nx=True, ex=lock_timeout)
# 如果处理逻辑超过 10 秒,锁就失效了
result = slow_process() # 可能需要 15 秒
解决方案 1:自动续期(看门狗)
# ✅ 后台线程定期续期
class LockWithWatchdog:
def __init__(self, redis, key, timeout=30):
self.redis = redis
self.key = key
self.timeout = timeout
self.value = str(uuid.uuid4())
self._stop_watchdog = False
async def acquire(self):
acquired = await self.redis.set(
self.key, self.value, nx=True, ex=self.timeout
)
if acquired:
# 启动看门狗
asyncio.create_task(self._watchdog())
return acquired
async def _watchdog(self):
"""每隔 timeout/3 续期一次"""
while not self._stop_watchdog:
await asyncio.sleep(self.timeout / 3)
await self.redis.expire(self.key, self.timeout)
async def release(self):
self._stop_watchdog = True
await self.redis.eval(
"if redis.call('get', KEYS[1]) == ARGV[1] then "
"return redis.call('del', KEYS[1]) else return 0 end",
1, self.key, self.value
)
解决方案 2:检查锁所有权
# ✅ 写入前检查锁是否还属于自己
async def safe_write_with_lock(lock_key, lock_value):
# 处理业务逻辑
result = await process_data()
# 写入前检查锁
current_owner = await redis.get(lock_key)
if current_owner != lock_value:
raise LockExpiredError("锁已过期,放弃写入")
# 确认锁还在,才写入
await db.write(result)
4️⃣ 惊群效应(Thundering Herd)
场景:1000 个进程等待同一个锁
时刻 T:
进程 A 持有锁 ──┐
│
时刻 T+1: │
进程 A 释放锁 ──┘
→ 1000 个等待进程同时被唤醒!
→ 只有 1 个能获取锁
→ 其他 999 个又继续等待
→ CPU 资源浪费
解决方案:公平锁/队列
# ✅ 使用 Redis List 实现公平队列
class FairLock:
async def acquire(self, key, timeout=30):
request_id = str(uuid.uuid4())
# 加入等待队列
await redis.rpush(f"{key}:queue", request_id)
# 轮询检查是否轮到自己
while True:
# 只有队列头部的请求尝试获取锁
first = await redis.lindex(f"{key}:queue", 0)
if first == request_id:
# 尝试获取锁
acquired = await redis.set(key, request_id, nx=True, ex=timeout)
if acquired:
await redis.lpop(f"{key}:queue")
return request_id
await asyncio.sleep(0.1)
5️⃣ 锁泄漏(Lock Leak)
场景:获取锁后异常退出
# ❌ 异常时锁没释放
def process_with_lock():
lock.acquire()
result = do_something() # 如果这里抛异常
lock.release() # 这行永远不会执行
return result
→ 锁永远不会释放,其他线程永远等待!
解决方案:使用 try-finally 或上下文管理器
# ✅ 方案 1:try-finally
def process_with_lock():
lock.acquire()
try:
return do_something()
finally:
lock.release() # 无论如何都会执行
# ✅ 方案 2:上下文管理器(推荐)
def process_with_lock():
with lock:
return do_something()
# ✅ 方案 3:分布式锁设置超时
lock_timeout = 30 # 最多 30 秒后自动释放
redis.set(lock_key, lock_value, nx=True, ex=lock_timeout)
🛠️ 分布式锁实现对比
Redis 单机锁
实现:
# SET NX EX 原子操作
acquired = redis.set("lock:resource", unique_id, nx=True, ex=30)
优点:
- ✅ 实现简单
- ✅ 性能高
缺点:
- ❌ Redis 宕机则锁失效
- ❌ 主从切换可能丢失锁
Redlock(Redis 官方推荐)
原理:向多个 Redis 实例(N=5)请求锁,超过半数成功才算获取锁。
def acquire_redlock(resource, ttl=30):
start_time = time.time()
lock_value = str(uuid.uuid4())
# 向 5 个独立 Redis 实例请求锁
successes = 0
for redis in redis_instances:
acquired = redis.set(resource, lock_value, nx=True, ex=ttl)
if acquired:
successes += 1
# 检查是否超过半数
elapsed = time.time() - start_time
if successes >= 3 and elapsed < ttl:
return lock_value # 成功
else:
# 释放所有已获取的锁
release_redlock(resource, lock_value)
return None
优点:
- ✅ 容错性高(可容忍 2 个节点故障)
- ✅ 避免单点故障
缺点:
- ❌ 实现复杂
- ❌ 性能较低(需要请求多个节点)
- ❌ 仍有争议(Martin Kleppmann 的反对)
Etcd/ZooKeeper 分布式锁
原理:利用强一致性存储系统的租约(Lease)机制。
# Etcd 实现
def acquire_etcd_lock(key, ttl=30):
# 创建租约
lease = etcd.lease(ttl)
# 尝试写入
try:
etcd.put(key, "locked", lease=lease)
return lease
except AlreadyExistsError:
return None
def release_etcd_lock(lease):
lease.revoke()
优点:
- ✅ 强一致性(基于 Raft/ZAB)
- ✅ 自动续期
- ✅ Watch 机制(无需轮询)
缺点:
- ❌ 需要额外部署 Etcd/ZK 集群
- ❌ 性能不如 Redis
数据库锁
实现:
-- 获取锁
INSERT INTO locks (resource, owner, expire_at)
VALUES ('resource_1', 'worker_1', NOW() + INTERVAL 30 SECOND)
ON DUPLICATE KEY UPDATE
owner = IF(expire_at < NOW(), 'worker_1', owner),
expire_at = IF(expire_at < NOW(), NOW() + INTERVAL 30 SECOND, expire_at);
-- 检查是否获取成功
SELECT owner FROM locks WHERE resource = 'resource_1';
优点:
- ✅ 无需额外组件
- ✅ 强一致性
缺点:
- ❌ 性能低(数据库 QPS 有限)
- ❌ 增加数据库负担
对比表格:
| 方案 | 性能 | 可靠性 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| Redis 单机 | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐ | 低风险场景 |
| Redlock | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 高可用要求 |
| Etcd/ZK | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | 强一致性要求 |
| 数据库 | ⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐ | 低并发场景 |
🎯 最佳实践
1️⃣ 锁的粒度要合适
太粗:性能低
# ❌ 锁住整个订单表
global_lock = Lock()
def process_order(order_id):
with global_lock: # 所有订单都串行了
# 处理订单
pass
太细:复杂度高
# ❌ 每个字段一个锁(过于复杂)
locks = {
'order.status': Lock(),
'order.amount': Lock(),
'order.items': Lock(),
}
刚好:按资源维度加锁
# ✅ 每个订单一个锁
order_locks = {}
def process_order(order_id):
if order_id not in order_locks:
order_locks[order_id] = Lock()
with order_locks[order_id]: # 只锁这个订单
# 处理订单
pass
2️⃣ 设置合理的超时时间
太短:锁提前过期
# ❌ 处理需要 30 秒,锁只有 10 秒
lock_timeout = 10
redis.set(lock_key, value, nx=True, ex=lock_timeout)
process_order() # 需要 30 秒
太长:故障恢复慢
# ❌ 如果进程崩溃,锁要 1 小时后才释放
lock_timeout = 3600
redis.set(lock_key, value, nx=True, ex=lock_timeout)
合理:P99 耗时 × 2 ~ 3
# ✅ 根据实际耗时设置
# 假设处理订单 P99 耗时 5 秒
lock_timeout = 15 # 5 秒 × 3
redis.set(lock_key, value, nx=True, ex=lock_timeout)
3️⃣ 使用唯一标识防止误删
问题场景:
进程 A: [获取锁 value=A] ── [处理超时] ── [锁过期] ────── [释放锁❌]
进程 B: ──────────────── [获取锁 value=B] ── [处理中] ──────────
↑
进程 A 误删了进程 B 的锁!
错误做法:
# ❌ 直接删除,可能误删
redis.set(lock_key, "locked", nx=True, ex=30)
# ... 处理 ...
redis.delete(lock_key) # 可能删掉别人的锁
正确做法:
# ✅ 使用唯一 ID + Lua 脚本
lock_value = str(uuid.uuid4())
redis.set(lock_key, lock_value, nx=True, ex=30)
# ... 处理 ...
# Lua 脚本保证原子性
redis.eval("""
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
""", 1, lock_key, lock_value)
4️⃣ 日志记录锁的获取和释放
便于排查问题:
# ✅ 完整的锁日志
class DistributedLock:
async def acquire(self, key, timeout=30):
owner = str(uuid.uuid4())
logger.info(f"[LOCK] Trying to acquire: key={key}, owner={owner}")
acquired = await redis.set(key, owner, nx=True, ex=timeout)
if acquired:
logger.info(f"[LOCK] Acquired: key={key}, owner={owner}, timeout={timeout}s")
else:
logger.warning(f"[LOCK] Failed to acquire: key={key}")
return acquired
async def release(self, key, owner):
logger.info(f"[LOCK] Releasing: key={key}, owner={owner}")
result = await redis.eval(lua_script, 1, key, owner)
if result:
logger.info(f"[LOCK] Released: key={key}, owner={owner}")
else:
logger.warning(f"[LOCK] Release failed (lock expired or owned by others): key={key}")
日志示例:
2024-12-25 10:00:00 [INFO] [LOCK] Trying to acquire: key=order:12345, owner=a1b2c3
2024-12-25 10:00:00 [INFO] [LOCK] Acquired: key=order:12345, owner=a1b2c3, timeout=30s
2024-12-25 10:00:15 [INFO] [LOCK] Released: key=order:12345, owner=a1b2c3
→ 可以清楚地看到锁的生命周期
5️⃣ 区分场景选择锁类型
决策树:
需要锁吗?
├─ 单进程 → 本地锁(threading.Lock)
└─ 多进程/分布式
├─ 读多写少 → 乐观锁(版本号)
└─ 写多 / 强一致性
├─ 低风险 → Redis 单机锁
├─ 高可用 → Redlock / Redis Cluster
└─ 强一致性 → Etcd / ZooKeeper
🚀 总结
锁的核心价值
| 没有锁 | 有锁 |
|---|---|
| 🙈 数据不一致 | 🔒 数据强一致 |
| 😰 超卖、重复扣款 | ✅ 业务逻辑正确 |
| 🔮 并发 bug 难调试 | 🎯 可预测的行为 |
| 💸 资金损失 | 💰 业务可靠 |
记忆口诀
🔒 悲观锁:先锁后操作,安全但慢
🌈 乐观锁:先操作后检查,快但可能冲突
🏠 本地锁:单进程,快
🌍 分布式锁:多进程,复杂
常见问题速查
| 问题 | 解决方案 |
|---|---|
| 死锁 | 全局顺序加锁 / 超时机制 |
| 活锁 | 随机退避 |
| 锁超时 | 看门狗续期 / 合理设置超时 |
| 惊群 | 公平锁 / 队列 |
| 锁泄漏 | try-finally / 上下文管理器 |
行动建议
第一步:识别临界区
- ✅ 找出所有并发访问的共享资源
- ✅ 分析是否有数据竞争
第二步:选择合适的锁
- ✅ 单进程 → 本地锁
- ✅ 分布式 → 分布式锁
- ✅ 读多写少 → 乐观锁
第三步:防御式编程
- ✅ 设置超时
- ✅ 使用唯一标识
- ✅ 记录日志
- ✅ try-finally 保证释放
第四步:测试并发场景
- ✅ 压测验证
- ✅ 混沌测试(故意制造失败)
- ✅ 监控锁的持有时间
📚 延伸阅读
锁是并发的红绿灯,没有它,你的系统就是混乱的十字路口。 🚦
现在,你的系统有合适的锁机制吗? 🤔