分布式锁完全指南

从并发控制到分布式协调:为什么现代应用需要锁机制

December 25, 2024·12 min read·Yimin
#并发##分布式#Redis#数据一致性

本文带你彻底搞懂锁的核心原理,以及为什么没有正确的锁机制,你的应用会出现各种诡异的 bug。

🎯 为什么需要锁?

真实场景:抢票系统崩了

某演唱会开票,10 秒内涌入 10 万用户:

用户 A:查询余票 → 剩余 1 张 → 点击购买 → 成功!
用户 B:查询余票 → 剩余 1 张 → 点击购买 → 成功!
用户 C:查询余票 → 剩余 1 张 → 点击购买 → 成功!
...

结果:1 张票卖出了 50 份!😱

问题根源:

# ❌ 没有锁的代码
def buy_ticket(user_id):
    tickets = db.query("SELECT stock FROM tickets WHERE id=1")
    if tickets.stock > 0:
        # ⚠️ 这里有时间差!多个请求可能同时读到 stock > 0
        db.execute("UPDATE tickets SET stock = stock - 1 WHERE id=1")
        create_order(user_id)
        return "购买成功"
    return "已售罄"

时序图:

时间轴 →
用户 A:  [读 stock=1] -------- [写 stock=0] -------- [创建订单]
用户 B:  -------- [读 stock=1] -------- [写 stock=-1] -------- [创建订单]
用户 C:  ------------ [读 stock=1] -------- [写 stock=-2] -------- [创建订单]

结果:超卖!数据不一致!

没有锁机制,你的应用就像没有红绿灯的十字路口。


🔍 什么是锁?

锁的本质

锁是一种同步机制,保证在同一时刻,只有一个线程/进程/机器能访问临界资源。

┌─────────────────────────────────────────────┐
│           共享资源(数据库、文件)            │
│                                             │
│           ┌───────────────┐                 │
│           │  临界区代码   │  ← 需要保护    │
│           └───────────────┘                 │
└─────────────────────────────────────────────┘
         ↑           ↑           ↑
      线程 A      线程 B      线程 C
      (持锁)    (等待)    (等待)

核心目标:

  • 互斥(Mutual Exclusion):同一时刻只有一个持锁者
  • 可见性(Visibility):持锁者的修改对其他人可见
  • 有序性(Ordering):按顺序获取锁,避免混乱

🏛️ 锁的分类体系

按乐观/悲观分类

1️⃣ 悲观锁(Pessimistic Lock)

哲学:假设一定会冲突,提前加锁。

# 数据库悲观锁
def transfer_money(from_account, to_account, amount):
    with db.transaction():
        # SELECT ... FOR UPDATE 会锁住这一行
        from_balance = db.query(
            "SELECT balance FROM accounts WHERE id = ? FOR UPDATE",
            from_account
        )
        
        if from_balance >= amount:
            db.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?",
                      amount, from_account)
            db.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?",
                      amount, to_account)
            return True
        return False

特点:

  • ✅ 数据强一致
  • ✅ 适合写多读少场景
  • ❌ 性能较低(会阻塞其他请求)
  • ❌ 可能死锁

2️⃣ 乐观锁(Optimistic Lock)

哲学:假设不会冲突,提交时才检查。

# 版本号乐观锁
def update_product(product_id, new_price):
    # 读取当前数据和版本号
    product = db.query(
        "SELECT price, version FROM products WHERE id = ?",
        product_id
    )
    
    # 更新时检查版本号
    rows_affected = db.execute(
        """UPDATE products 
           SET price = ?, version = version + 1 
           WHERE id = ? AND version = ?""",
        new_price, product_id, product.version
    )
    
    if rows_affected == 0:
        # 版本号不匹配,说明被别人改了
        raise ConflictError("数据已被修改,请重试")
    
    return True

特点:

  • ✅ 性能高(不阻塞读操作)
  • ✅ 适合读多写少场景
  • ❌ 冲突时需要重试
  • ❌ 不适合高并发写

对比表格:

特性悲观锁乐观锁
假设一定冲突很少冲突
加锁时机读取前提交时
性能较低较高
适用场景写多读多
实现方式SELECT FOR UPDATE版本号/CAS

按锁范围分类

3️⃣ 本地锁(Local Lock)

作用域:单个进程内的线程间同步。

import threading

lock = threading.Lock()
counter = 0

def increment():
    global counter
    with lock:  # 自动加锁和解锁
        counter += 1

# 多线程安全
threads = [threading.Thread(target=increment) for _ in range(1000)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(counter)  # 1000(正确)

特点:

  • ✅ 速度极快(内存操作)
  • ✅ 实现简单
  • ❌ 只能保护单进程内的并发
  • ❌ 不适用于分布式系统

4️⃣ 分布式锁(Distributed Lock)

作用域:多个进程/多台机器间的同步。

# Redis 分布式锁
async def process_order(order_id):
    lock_key = f"lock:order:{order_id}"
    lock_value = str(uuid.uuid4())
    
    # 尝试获取锁(SET NX EX)
    acquired = await redis.set(
        lock_key, 
        lock_value, 
        nx=True,  # 不存在时才设置
        ex=30     # 30 秒后自动释放
    )
    
    if not acquired:
        return "订单处理中,请稍后"
    
    try:
        # 临界区:处理订单
        result = await do_process_order(order_id)
        return result
    finally:
        # 释放锁(用 Lua 保证原子性)
        await redis.eval("""
            if redis.call("get", KEYS[1]) == ARGV[1] then
                return redis.call("del", KEYS[1])
            else
                return 0
            end
        """, 1, lock_key, lock_value)

特点:

  • ✅ 跨进程/跨机器
  • ✅ 适用于分布式系统
  • ❌ 实现复杂(需要考虑网络、时钟等问题)
  • ❌ 性能较本地锁低

按锁粒度分类

5️⃣ 表锁 vs 行锁

┌─────────────────────────────────────────┐
│  表锁:锁住整张表                         │
│                                         │
│  ❌ 粒度大,并发低                       │
│  ✅ 实现简单                             │
│                                         │
│  LOCK TABLES orders WRITE;              │
└─────────────────────────────────────────┘

┌─────────────────────────────────────────┐
│  行锁:只锁住特定行                       │
│                                         │
│  ✅ 粒度小,并发高                       │
│  ❌ 实现复杂                             │
│                                         │
│  SELECT * FROM orders                   │
│  WHERE id = 123 FOR UPDATE;             │
└─────────────────────────────────────────┘

性能对比:

并发请求数 vs 吞吐量

表锁:   ▂▂▂▂▂▂▂▂▂▂  (低)
行锁:   ▂▄▆█████▇▅  (高)
       10  50  100  200  (并发数)

6️⃣ 读写锁(Read-Write Lock)

核心思想:读读不互斥,读写/写写互斥。

import threading

rw_lock = threading.RLock()
readers = 0

def read_data():
    global readers
    with rw_lock:
        readers += 1
        # 多个读者可以同时读
    
    try:
        data = shared_data.copy()
        return data
    finally:
        with rw_lock:
            readers -= 1

def write_data(new_data):
    with rw_lock:
        # 写操作独占
        shared_data.update(new_data)

适用场景:

  • ✅ 读多写少(如配置中心、缓存)
  • ✅ 提高读并发
  • ❌ 写操作仍需独占

🚨 常见问题与陷阱

1️⃣ 死锁(Deadlock)

经典场景:银行转账

# ❌ 会死锁的代码
def transfer(from_id, to_id, amount):
    lock_a = locks[from_id]
    lock_b = locks[to_id]
    
    with lock_a:
        with lock_b:
            # 转账逻辑
            pass

# 同时执行:
Thread-1: transfer(A, B, 100)  # 持有 A,等待 B
Thread-2: transfer(B, A, 50)   # 持有 B,等待 A
→ 死锁!

死锁四要素:

  1. 互斥:资源不能共享
  2. 持有并等待:持有锁 A,等待锁 B
  3. 不可剥夺:不能强制抢锁
  4. 循环等待:A → B → A 形成环

解决方案 1:全局顺序加锁

# ✅ 永远按 ID 顺序加锁
def transfer(from_id, to_id, amount):
    first_id = min(from_id, to_id)
    second_id = max(from_id, to_id)
    
    lock_a = locks[first_id]
    lock_b = locks[second_id]
    
    with lock_a:
        with lock_b:
            # 转账逻辑
            pass

解决方案 2:超时机制

# ✅ 加锁失败时超时返回
def transfer(from_id, to_id, amount):
    lock_a = locks[from_id]
    lock_b = locks[to_id]
    
    if lock_a.acquire(timeout=5):
        try:
            if lock_b.acquire(timeout=5):
                try:
                    # 转账逻辑
                    pass
                finally:
                    lock_b.release()
            else:
                raise TimeoutError("获取锁超时")
        finally:
            lock_a.release()
    else:
        raise TimeoutError("获取锁超时")

2️⃣ 活锁(Livelock)

场景:两个人在走廊相遇

时刻 1: A 往左躲, B 往右躲 → 还是挡住
时刻 2: A 往右躲, B 往左躲 → 还是挡住
时刻 3: A 往左躲, B 往右躲 → 还是挡住
...
→ 虽然都在"工作",但没有进展

代码示例:

# ❌ 活锁:重试时机相同
def acquire_locks():
    while True:
        if try_lock_a() and try_lock_b():
            return True
        else:
            release_all_locks()
            time.sleep(1)  # 固定间隔
            # 多个线程可能同时重试

解决方案:随机退避

# ✅ 随机退避避免同步
import random

def acquire_locks():
    while True:
        if try_lock_a() and try_lock_b():
            return True
        else:
            release_all_locks()
            # 随机等待,避免同步重试
            time.sleep(random.uniform(0.1, 1.0))

3️⃣ 锁超时导致数据不一致

场景:分布式锁过期

时间轴 →
进程 A:  [获取锁] -------- [处理中...] -------- [锁过期❌] -------- [写数据库]
进程 B:  -------------- [获取锁✅] -------- [写数据库] -------- [释放锁]

结果:两个进程同时持有"锁",数据被覆盖!

问题代码:

# ❌ 固定超时,可能不够用
lock_timeout = 10  # 秒
acquired = redis.set(lock_key, lock_value, nx=True, ex=lock_timeout)

# 如果处理逻辑超过 10 秒,锁就失效了
result = slow_process()  # 可能需要 15 秒

解决方案 1:自动续期(看门狗)

# ✅ 后台线程定期续期
class LockWithWatchdog:
    def __init__(self, redis, key, timeout=30):
        self.redis = redis
        self.key = key
        self.timeout = timeout
        self.value = str(uuid.uuid4())
        self._stop_watchdog = False
    
    async def acquire(self):
        acquired = await self.redis.set(
            self.key, self.value, nx=True, ex=self.timeout
        )
        if acquired:
            # 启动看门狗
            asyncio.create_task(self._watchdog())
        return acquired
    
    async def _watchdog(self):
        """每隔 timeout/3 续期一次"""
        while not self._stop_watchdog:
            await asyncio.sleep(self.timeout / 3)
            await self.redis.expire(self.key, self.timeout)
    
    async def release(self):
        self._stop_watchdog = True
        await self.redis.eval(
            "if redis.call('get', KEYS[1]) == ARGV[1] then "
            "return redis.call('del', KEYS[1]) else return 0 end",
            1, self.key, self.value
        )

解决方案 2:检查锁所有权

# ✅ 写入前检查锁是否还属于自己
async def safe_write_with_lock(lock_key, lock_value):
    # 处理业务逻辑
    result = await process_data()
    
    # 写入前检查锁
    current_owner = await redis.get(lock_key)
    if current_owner != lock_value:
        raise LockExpiredError("锁已过期,放弃写入")
    
    # 确认锁还在,才写入
    await db.write(result)

4️⃣ 惊群效应(Thundering Herd)

场景:1000 个进程等待同一个锁

时刻 T: 
  进程 A 持有锁 ──┐
                  │
时刻 T+1:         │
  进程 A 释放锁 ──┘
  
  → 1000 个等待进程同时被唤醒!
  → 只有 1 个能获取锁
  → 其他 999 个又继续等待
  → CPU 资源浪费

解决方案:公平锁/队列

# ✅ 使用 Redis List 实现公平队列
class FairLock:
    async def acquire(self, key, timeout=30):
        request_id = str(uuid.uuid4())
        
        # 加入等待队列
        await redis.rpush(f"{key}:queue", request_id)
        
        # 轮询检查是否轮到自己
        while True:
            # 只有队列头部的请求尝试获取锁
            first = await redis.lindex(f"{key}:queue", 0)
            if first == request_id:
                # 尝试获取锁
                acquired = await redis.set(key, request_id, nx=True, ex=timeout)
                if acquired:
                    await redis.lpop(f"{key}:queue")
                    return request_id
            
            await asyncio.sleep(0.1)

5️⃣ 锁泄漏(Lock Leak)

场景:获取锁后异常退出

# ❌ 异常时锁没释放
def process_with_lock():
    lock.acquire()
    
    result = do_something()  # 如果这里抛异常
    
    lock.release()  # 这行永远不会执行
    return result

→ 锁永远不会释放,其他线程永远等待!

解决方案:使用 try-finally 或上下文管理器

# ✅ 方案 1:try-finally
def process_with_lock():
    lock.acquire()
    try:
        return do_something()
    finally:
        lock.release()  # 无论如何都会执行

# ✅ 方案 2:上下文管理器(推荐)
def process_with_lock():
    with lock:
        return do_something()

# ✅ 方案 3:分布式锁设置超时
lock_timeout = 30  # 最多 30 秒后自动释放
redis.set(lock_key, lock_value, nx=True, ex=lock_timeout)

🛠️ 分布式锁实现对比

Redis 单机锁

实现:

# SET NX EX 原子操作
acquired = redis.set("lock:resource", unique_id, nx=True, ex=30)

优点:

  • ✅ 实现简单
  • ✅ 性能高

缺点:

  • ❌ Redis 宕机则锁失效
  • ❌ 主从切换可能丢失锁

Redlock(Redis 官方推荐)

原理:向多个 Redis 实例(N=5)请求锁,超过半数成功才算获取锁。

def acquire_redlock(resource, ttl=30):
    start_time = time.time()
    lock_value = str(uuid.uuid4())
    
    # 向 5 个独立 Redis 实例请求锁
    successes = 0
    for redis in redis_instances:
        acquired = redis.set(resource, lock_value, nx=True, ex=ttl)
        if acquired:
            successes += 1
    
    # 检查是否超过半数
    elapsed = time.time() - start_time
    if successes >= 3 and elapsed < ttl:
        return lock_value  # 成功
    else:
        # 释放所有已获取的锁
        release_redlock(resource, lock_value)
        return None

优点:

  • ✅ 容错性高(可容忍 2 个节点故障)
  • ✅ 避免单点故障

缺点:

  • ❌ 实现复杂
  • ❌ 性能较低(需要请求多个节点)
  • ❌ 仍有争议(Martin Kleppmann 的反对)

Etcd/ZooKeeper 分布式锁

原理:利用强一致性存储系统的租约(Lease)机制。

# Etcd 实现
def acquire_etcd_lock(key, ttl=30):
    # 创建租约
    lease = etcd.lease(ttl)
    
    # 尝试写入
    try:
        etcd.put(key, "locked", lease=lease)
        return lease
    except AlreadyExistsError:
        return None

def release_etcd_lock(lease):
    lease.revoke()

优点:

  • ✅ 强一致性(基于 Raft/ZAB)
  • ✅ 自动续期
  • ✅ Watch 机制(无需轮询)

缺点:

  • ❌ 需要额外部署 Etcd/ZK 集群
  • ❌ 性能不如 Redis

数据库锁

实现:

-- 获取锁
INSERT INTO locks (resource, owner, expire_at) 
VALUES ('resource_1', 'worker_1', NOW() + INTERVAL 30 SECOND)
ON DUPLICATE KEY UPDATE 
  owner = IF(expire_at < NOW(), 'worker_1', owner),
  expire_at = IF(expire_at < NOW(), NOW() + INTERVAL 30 SECOND, expire_at);

-- 检查是否获取成功
SELECT owner FROM locks WHERE resource = 'resource_1';

优点:

  • ✅ 无需额外组件
  • ✅ 强一致性

缺点:

  • ❌ 性能低(数据库 QPS 有限)
  • ❌ 增加数据库负担

对比表格:

方案性能可靠性复杂度适用场景
Redis 单机⭐⭐⭐⭐⭐⭐⭐低风险场景
Redlock⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐高可用要求
Etcd/ZK⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐强一致性要求
数据库⭐⭐⭐⭐⭐⭐⭐⭐低并发场景

🎯 最佳实践

1️⃣ 锁的粒度要合适

太粗:性能低

# ❌ 锁住整个订单表
global_lock = Lock()

def process_order(order_id):
    with global_lock:  # 所有订单都串行了
        # 处理订单
        pass

太细:复杂度高

# ❌ 每个字段一个锁(过于复杂)
locks = {
    'order.status': Lock(),
    'order.amount': Lock(),
    'order.items': Lock(),
}

刚好:按资源维度加锁

# ✅ 每个订单一个锁
order_locks = {}

def process_order(order_id):
    if order_id not in order_locks:
        order_locks[order_id] = Lock()
    
    with order_locks[order_id]:  # 只锁这个订单
        # 处理订单
        pass

2️⃣ 设置合理的超时时间

太短:锁提前过期

# ❌ 处理需要 30 秒,锁只有 10 秒
lock_timeout = 10
redis.set(lock_key, value, nx=True, ex=lock_timeout)
process_order()  # 需要 30 秒

太长:故障恢复慢

# ❌ 如果进程崩溃,锁要 1 小时后才释放
lock_timeout = 3600
redis.set(lock_key, value, nx=True, ex=lock_timeout)

合理:P99 耗时 × 2 ~ 3

# ✅ 根据实际耗时设置
# 假设处理订单 P99 耗时 5 秒
lock_timeout = 15  # 5 秒 × 3
redis.set(lock_key, value, nx=True, ex=lock_timeout)

3️⃣ 使用唯一标识防止误删

问题场景:

进程 A:  [获取锁 value=A] ── [处理超时] ── [锁过期] ────── [释放锁❌]
进程 B:  ──────────────── [获取锁 value=B] ── [处理中] ──────────
                                           ↑
                                    进程 A 误删了进程 B 的锁!

错误做法:

# ❌ 直接删除,可能误删
redis.set(lock_key, "locked", nx=True, ex=30)
# ... 处理 ...
redis.delete(lock_key)  # 可能删掉别人的锁

正确做法:

# ✅ 使用唯一 ID + Lua 脚本
lock_value = str(uuid.uuid4())
redis.set(lock_key, lock_value, nx=True, ex=30)

# ... 处理 ...

# Lua 脚本保证原子性
redis.eval("""
    if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
    else
        return 0
    end
""", 1, lock_key, lock_value)

4️⃣ 日志记录锁的获取和释放

便于排查问题:

# ✅ 完整的锁日志
class DistributedLock:
    async def acquire(self, key, timeout=30):
        owner = str(uuid.uuid4())
        logger.info(f"[LOCK] Trying to acquire: key={key}, owner={owner}")
        
        acquired = await redis.set(key, owner, nx=True, ex=timeout)
        
        if acquired:
            logger.info(f"[LOCK] Acquired: key={key}, owner={owner}, timeout={timeout}s")
        else:
            logger.warning(f"[LOCK] Failed to acquire: key={key}")
        
        return acquired
    
    async def release(self, key, owner):
        logger.info(f"[LOCK] Releasing: key={key}, owner={owner}")
        
        result = await redis.eval(lua_script, 1, key, owner)
        
        if result:
            logger.info(f"[LOCK] Released: key={key}, owner={owner}")
        else:
            logger.warning(f"[LOCK] Release failed (lock expired or owned by others): key={key}")

日志示例:

2024-12-25 10:00:00 [INFO] [LOCK] Trying to acquire: key=order:12345, owner=a1b2c3
2024-12-25 10:00:00 [INFO] [LOCK] Acquired: key=order:12345, owner=a1b2c3, timeout=30s
2024-12-25 10:00:15 [INFO] [LOCK] Released: key=order:12345, owner=a1b2c3

→ 可以清楚地看到锁的生命周期

5️⃣ 区分场景选择锁类型

决策树:

需要锁吗?
  ├─ 单进程 → 本地锁(threading.Lock)
  └─ 多进程/分布式
      ├─ 读多写少 → 乐观锁(版本号)
      └─ 写多 / 强一致性
          ├─ 低风险 → Redis 单机锁
          ├─ 高可用 → Redlock / Redis Cluster
          └─ 强一致性 → Etcd / ZooKeeper

🚀 总结

锁的核心价值

没有锁有锁
🙈 数据不一致🔒 数据强一致
😰 超卖、重复扣款✅ 业务逻辑正确
🔮 并发 bug 难调试🎯 可预测的行为
💸 资金损失💰 业务可靠

记忆口诀

🔒 悲观锁:先锁后操作,安全但慢
🌈 乐观锁:先操作后检查,快但可能冲突
🏠 本地锁:单进程,快
🌍 分布式锁:多进程,复杂

常见问题速查

问题解决方案
死锁全局顺序加锁 / 超时机制
活锁随机退避
锁超时看门狗续期 / 合理设置超时
惊群公平锁 / 队列
锁泄漏try-finally / 上下文管理器

行动建议

第一步:识别临界区

  • ✅ 找出所有并发访问的共享资源
  • ✅ 分析是否有数据竞争

第二步:选择合适的锁

  • ✅ 单进程 → 本地锁
  • ✅ 分布式 → 分布式锁
  • ✅ 读多写少 → 乐观锁

第三步:防御式编程

  • ✅ 设置超时
  • ✅ 使用唯一标识
  • ✅ 记录日志
  • ✅ try-finally 保证释放

第四步:测试并发场景

  • ✅ 压测验证
  • ✅ 混沌测试(故意制造失败)
  • ✅ 监控锁的持有时间

📚 延伸阅读


锁是并发的红绿灯,没有它,你的系统就是混乱的十字路口。 🚦

现在,你的系统有合适的锁机制吗? 🤔

分布式锁完全指南