高并发打赏扣费实战:Redis Lua、MQ 削峰与乐观锁兜底

Lirous.
4/15/2026
31 min read
抛弃笨重的分布式事务。探讨在瞬时高并发场景下,如何组合 Redis 原子扣减、RabbitMQ 异步解耦与 PostgreSQL 版本号防重,实现一条纯净且可靠的交易流水线。

live-interact-engine 项目的礼物打赏功能设计时,我遇到了一个每个后端都会经历的噩梦:高并发下的数据一致性问题

这不是一个单一的技术难题,而是三个互相冲突的需求同时砸了过来。我花了差不多一周的时间,从最原始的想法,到最后的三层防御架构,摸着石头过了河。

三重困局:性能 vs 一致性 vs 可靠性

让我先铺垫一下背景。直播间有个"打赏礼物"功能,用户余额足够时,可以给主播送礼物。核心逻辑很简单:

用户余额 - 礼物价格 = 新余额
礼物库存 - 1 = 新库存

但在生产环境,这个"简单"的操作会同时面对三个地狱级的约束。

痛点 1:性能的噩梦 — 数据库悲观锁把 TPS 砸烂

我一开始的设计(很朴素):

// 用户点击赠送礼物
func GiftToUser(buyerID, targetID, giftID string, count int) error {
    // 1. 查询礼物价格
    gift := db.GetGift(giftID)
    totalCost := gift.Price * count

    // 2. 查询买家余额
    buyer := db.GetUser(buyerID)
    if buyer.Balance < totalCost {
        return ErrInsufficientBalance
    }

    // 3. 扣减买家余额
    db.UpdateBalance(buyerID, buyer.Balance - totalCost)

    // 4. 扣减礼物库存
    db.UpdateGiftStock(giftID, count)

    // 5. 记录流水
    db.LogTransaction(buyerID, targetID, giftID, count)

    return nil
}

这段代码在并发压力下就是个炸弹。为什么?

想象两个并发请求来了,都是同一个用户,余额 100 元,各买 60 元的礼物:

请求 1:查余额(100)→ 检查通过 → 还没扣呢,上下文切换
请求 2:查余额(100)→ 检查通过 → 开始扣减
请求 2:扣成 40 元 → 成功
请求 1:继续执行 → 扣成 40 元 → 成功

最后结果:两笔共 120 元的消费,余额却只扣了 60 元。钱凭空蒸发了。

第一个想法:用数据库排队与乐观锁

实际上,我第一反应就是上 PostgreSQL 的悲观行锁 SELECT ... FOR UPDATE。但为了避免严重锁排队,我们一开始的真实实现采用了基于版本号的乐观锁,看起来像这样:

// 钱包数据库仓储实现(基于 Ent ORM)
type WalletRepository struct {
	client *ent.Client
}

// UpdateWallet 使用乐观锁更新
func (r *WalletRepository) UpdateWallet(ctx context.Context, wallet *domain.Wallet) error {
	// 这种更新在高并发下会导致大量版本冲突重试
	result, err := r.client.Wallet.
		Update().
		Where(
			entwallet.UserIDEQ(wallet.UserID),
			entwallet.VersionNumberEQ(wallet.VersionNumber),
		).
		SetBalance(wallet.Balance).
		SetVersionNumber(wallet.VersionNumber + 1).
		Save(ctx)

	if err != nil {
		return err
	}

	if result == 0 {
		return types.ErrVersionConflict
	}

	return nil
}

这个方案理论上能保证强一致性。但问题是——在瞬时热点流量下,直接写库会成为灾难

无论是悲观排队还是像上面这样的乐观锁,当大量并发请求同时想要更新同一行数据(买家余额)时,都会导致大量的锁竞争或版本冲突重试。整个数据库的连接池可能会瞬间打满,直接拖垮整个服务。

那时候我就想:核心的扣费判定,不能直接写库,得提速

第二个想法:纯 Redis 操作,但还是竞态条件

我想到了 Redis。把用户余额存在 Redis,速度肯定比数据库快 N 倍。但单个 Redis 命令虽然是原子的,多个命令之间仍然有竞态条件:

// 错误的做法:多个 Redis 命令无法保证原子性
func DeductBalanceIncorrect(ctx context.Context, userID string, amount int64) error {
    // 第一步:查余额
    balance, _ := redis.Get(ctx, "user:"+userID+":balance").Int64()

    // 这里有并发窗口!

    if balance < amount {
        return ErrInsufficientBalance
    }

    // 第二步:扣余额
    redis.Decrby(ctx, "user:"+userID+":balance", amount)

    return nil
}

但这还是有问题啊!Redis 操作本身虽然是原子的(单个命令),但多个命令之间还是有间隙

请求 1:GET balance(100)→ 检查通过 → 还没 DECRBY
请求 2:GET balance(100)→ 检查通过 → 也没 DECRBY
请求 2:DECRBY 扣 60 → 40
请求 1:DECRBY 扣 60 → 40

又翻车了。

那时候我开始意识到:我需要的不是快,而是"多步操作必须是一个不可分割的整体"

突破口:Redis + Lua 脚本

后来我想到了 Lua 脚本。Redis 的 EVAL 命令可以执行一段 Lua 代码,而整个脚本在 Redis 中是作为单个原子操作执行的

简单理解就是:其他客户端的命令会被阻塞,直到这段 Lua 脚本执行完。所以在 Lua 脚本里执行的多个 Redis 操作,从外部看就像是一个不可分割的操作。

为什么能保证原子性? Redis 是单线程模型。Lua 脚本在 Redis 中执行时,会持有 Redis 的执行权。其他命令都要排队等待。直到脚本执行完,下一个命令才能执行。这就保证了脚本内的所有操作要么全成功,要么全失败,不会出现中间状态。

在实战中,我把这个 Lua 脚本直接封装在了 Go 代码里,并且加入了基于 Idempotency Key (幂等键) 的防重机制

Go 语言中调用 Lua 脚本

这也是我现在工程里真正跑着的代码:

// walletCache Redis 钱包缓存实现
type walletCache struct {
	client             *redis.Client
	prefix             string // 例如 "wallet:"
	luaDeductScript    string
	luaIncrementScript string
}

// NewWalletCache 创建钱包缓存,返回实现 domain.WalletCache 接口的实例
func NewWalletCache(client *redis.Client) domain.WalletCache {
	luaDeductScript := `
		local key = KEYS[1]
		local idempotencyKey = KEYS[2]
		local amount = tonumber(ARGV[1])

		-- 检查幂等性:如果这个 idempotency_key 已经处理过,直接返回失败
		if redis.call('EXISTS', idempotencyKey) == 1 then
			return {0, redis.call('GET', key) or 0}  -- 幂等防重,不再处理
		end

		-- 获取当前余额
		local balance = tonumber(redis.call('GET', key) or 0)

		-- 检查余额是否足够
		if balance < amount then
			return {-1, balance}  -- -1 表示余额不足
		end

		-- 原子扣款
		balance = balance - amount
		redis.call('SET', key, balance)

		-- 记录幂等性(设置过期时间24小时,防止 hash 表无限增长)
		redis.call('SETEX', idempotencyKey, 86400, '1')

		return {1, balance}  -- 1 表示扣款成功,返回新余额
	`

	return &walletCache{
		client:          client,
		prefix:          "wallet:",
		luaDeductScript: luaDeductScript,
	}
}

// DeductByLua 使用 Lua 脚本原子扣款
func (c *walletCache) DeductByLua(ctx context.Context, userID uuid.UUID, amount int64, idempotencyKey uuid.UUID) (int64, error) {
	key := c.prefix + userID.String()
	idempotencyKeyStr := "idempotency:" + idempotencyKey.String()

	// 执行 Lua 脚本
	result, err := c.client.Eval(ctx, c.luaDeductScript, []string{key, idempotencyKeyStr}, amount).Result()
	if err != nil {
		return 0, err
	}

	// 解析返回值(安全的类型转换)
	results := result.([]interface{})
	code := toInt64(results[0])
	newBalance := toInt64(results[1])

	switch code {
	case 1:
		return newBalance, nil // 扣款成功
	case 0:
		return newBalance, types.ErrInvalidAmount // 幂等防重,已处理过
	case -1:
		return newBalance, types.ErrInsufficientBalance // 余额不足
	default:
		return 0, fmt.Errorf("unknown error code: %d", code)
	}
}

其实这里面有几处细节值得玩味:redis.call('GET', key) 查不到东西时返回的不是 nil 而是 false,必须通过 or 0 兜底强转;而且通过这一段脚本,我们一次性解决了并发查改防重扣款两个大问题。

我们看看外层的 Service 是怎么组合这个能力的

type WalletService struct {
	walletRepo   domain.WalletRepository
	walletCache  domain.WalletCache
	walletFilter domain.WalletFilter
}

func NewWalletService(
	walletRepo domain.WalletRepository,
	walletCache domain.WalletCache,
	walletFilter domain.WalletFilter,
) *WalletService {
	return &WalletService{
		walletRepo:   walletRepo,
		walletCache:  walletCache,
		walletFilter: walletFilter,
	}
}

// DeductBalance 使用Redis Lua原子扣款
func (s *WalletService) DeductBalance(ctx context.Context, userID uuid.UUID, amount int64, idempotencyKey uuid.UUID) (int64, error) {
	// 检查过滤器,如果未初始化则从 DB 加载
	if !s.walletFilter.Exists(ctx, userID) {
		wallet, err := s.walletRepo.GetWallet(ctx, userID)
		// 存在err且不为types.ErrWalletNotFound
		if err != nil && !errors.Is(err, types.ErrWalletNotFound) {
			return 0, err
		}

		if errors.Is(err, types.ErrWalletNotFound) {
			// 钱包不存在,创建钱包
			newWallet := &domain.Wallet{
				UserID:        userID,
				Balance:       0,
				VersionNumber: 0,
			}

			if err := s.walletRepo.CreateWallet(ctx, newWallet); err != nil {
				zap.L().Error("failed to create wallet for user",
					zap.String("user_id", userID.String()),
					zap.Error(err))
				return 0, err
			}
			wallet = newWallet
		}

		balance := wallet.Balance

		// 写入 Redis 缓存
		if err := s.walletCache.SetBalance(ctx, userID, balance); err != nil {
			return 0, err
		}

		// 标记为已初始化
		if err := s.walletFilter.Add(ctx, userID); err != nil {
			zap.L().Error("add wallet filter failed", zap.String("user_id", userID.String()), zap.Error(err))
		}
	}

	return s.walletCache.DeductByLua(ctx, userID, amount, idempotencyKey)
}

这一层我们通过缓存做过滤器,利用 Lua 脚本保证高并发 + 原子扣减。但是打赏并不仅仅是"扣钱"这么简单,还要真实地记录到数据库产生流水。在高峰流量下同步写入数据库,TPS 又会被打回原形。

第二层:RabbitMQ 异步解耦 — 避免落盘拖累主流程

我们的策略是通过 MQ 将持久化解耦。业务服务在 Redis 扣费成功后,会发布一个 gift.send.success 的事件广播到 RabbitMQ。后续落盘和流水记录,全交给 Consumer 异步削峰处理。

数据库只需要老老实实当一个小水管:

// Start 启动消费者(阻塞)
func (c *Consumer) Start(ctx context.Context) error {
	// 1. 声明并绑定礼物队列
	_, err := c.channel.QueueDeclare(
		QueueName,
		true,  // durable
		false, // auto-delete
		false, // exclusive
		false, // no-wait
		amqp.Table{
			"x-dead-letter-exchange": "gift-dlx",
		},
	)
	if err != nil {
		zap.L().Error("failed to declare gift queue", zap.Error(err))
		return err
	}

	// 绑定礼物队列到Exchange(仅订阅 gift.send.success 消息)
	err = c.channel.QueueBind(
		QueueName,
		"gift.send.success",
		ExchangeName,
		false,
		nil,
	)
	if err != nil {
		zap.L().Error("failed to bind gift queue", zap.Error(err))
		return err
	}

	// 2. 消费礼物发送事件
	giftMsgs, err := c.channel.Consume(
		QueueName,  // 队列名
		"",         // consumer tag(空表示自动生成)
		false,      // auto-ack(必须 false,手动 ack)
		false,      // exclusive
		false,      // no-local
		false,      // no-wait
		nil,        // args
	)
	if err != nil {
		zap.L().Error("failed to consume from gift queue", zap.Error(err))
		return err
	}

	// 3. 消费消息
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case msg := <-giftMsgs:
			if err := c.HandleGiftSendSuccess(context.Background(), msg.Body); err != nil {
				zap.L().Error("failed to handle gift send success", zap.Error(err))
				// 拒绝消息,重新入队
				msg.Nack(false, true)
				continue
			}
			// 消息处理成功,确认 ack
			msg.Ack(false)
		}
	}
}

削峰解耦很爽,但这里隐藏着一个深坑:RabbitMQ 的 At-least-once (至少投递一次) 保证

如果 Consumer 处理完了还没来得及 ACK 进程就挂了,MQ 心跳超时后会重新投递。这也就意味着有些事件极有可能会遭遇二次“暴击”。要是我们粗暴写库,同一笔交易就会扣费两次产生两条流水。

第三层:数据库兜底 + 领域架构 — 乐观事务幂等消费

虽然我们在 Redis 中做了 EXISTS idempotencyKey 的首道过滤,但这仍不够。为了抵御重复投递和真正的竞态修改,落盘时还需要使用数据库版本号 (Version Number) 防重。并且这绝不仅仅是跑句 SQL 就完事。

我当时就在琢磨:怎样不将庞大的底层 ORM 框架混在干净的写库逻辑中?其实核心就两点:

  1. 从数据库读取当前 Wallet(带版本号)
  2. 在同一事务里写入交易流水,不仅使用幂等键 IdempotencyKey 防重,还利用带有版本号条件 Where(UserIDEQ(...), VersionNumberEQ(...)) 更新钱包主表。

为此,我把更新操作隔离到一个专门的 Consumer 处理方法中:

// 领域层定义的事务接口(纯净,不依赖任何 ORM)
type Tx interface {
	Commit() error
	Rollback() error
}

// 消费端的处理逻辑(完全不知道 Ent、GORM 的存在)
type TransactionConsumer struct {
	walletRepo            domain.WalletRepository
	walletTransactionRepo domain.WalletTransactionRepository
	txFactory             func(ctx context.Context) (domain.Tx, error)  // 工厂函数注入
}

func (tc *TransactionConsumer) ConsumeMessage(ctx context.Context, event *events.GiftSendSuccessEvent) error {
	tx, err := tc.txFactory(ctx)
	if err != nil {
		return err
	}
	defer tx.Rollback()

	// 核心逻辑:UPDATE 钱包余额(使用乐观锁)
	wallet, err := tc.walletRepo.GetWallet(ctx, event.UserID)
	if err != nil {
		return err
	}

	// 业务检查:确保余额一致(防止并发冲突)
	err = wallet.Deduct(event.Amount)
	if err != nil {
		return err
	}

	// 在事务内更新钱包(带乐观锁验证)
	// 如果版本不符,抛出 err 交给上层重试或记录
	err = tc.walletRepo.UpdateWalletTx(ctx, tx, wallet)
	if err != nil {
		return err
	}

	// 记录流水(便于对账和审计)
	txn := &domain.WalletTransaction{
		IdempotencyKey: event.IdempotencyKey,
		Type:           domain.WalletTransactionTypeGiftSend,
		PayerID:        wallet.UserID,
		Amount:         event.Amount,
		Status:         "SUCCESS",
	}

	err = tc.walletTransactionRepo.SaveWalletTransactionTx(ctx, tx, txn)
	if err != nil {
		return err
	}

	return tx.Commit()
}

为什么要自定义 Tx 接口而不是用 Ent 或 GORM 的事务

因为领域层不应该依赖具体的 ORM 框架。如果我用了 ent.Tx,那领域层就被 Ent 绑死了。将来如果要换 GORM 或其他框架,整个领域层都要改。

自定义接口让我可以灵活地注入不同的实现:

// 钱包数据库仓储实现
type WalletRepository struct {
	client *ent.Client
}

func (r *WalletRepository) Tx(ctx context.Context) (domain.Tx, error) {
	tx, err := r.client.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	// ent.Tx 天然实现了 Commit() 和 Rollback(),直接返回即可
	return tx, nil
}

// UpdateWalletTx 在事务内更新钱包余额(使用乐观锁)
func (r *WalletRepository) UpdateWalletTx(ctx context.Context, tx domain.Tx, wallet *domain.Wallet) error {
	entTx := tx.(*ent.Tx)

	result, err := entTx.Wallet.
		Update().
		Where(
			entwallet.UserIDEQ(wallet.UserID),
			entwallet.VersionNumberEQ(wallet.VersionNumber),
		).
		SetBalance(wallet.Balance).
		SetVersionNumber(wallet.VersionNumber + 1).
		Save(ctx)

	if err != nil {
		return err
	}

	if result == 0 {
		return types.ErrVersionConflict
	}

	return nil
}

// 注入到 Consumer
consumer := &TransactionConsumer{
	walletRepo:            walletRepo,
	walletTransactionRepo: walletTransactionRepo,
	txFactory: func(ctx context.Context) (domain.Tx, error) {
		return walletRepo.Tx(ctx)
	},
}

这样做的好处是:领域层再也不用混杂 ORM 代码,核心处理逻辑与底层数据源彻底解耦。而且借这个机会,我甚至抛弃了封装多余结构的逻辑,既然 ent.Tx 天然支持 Commit()Rollback(),直接强制类型转换 entTx := tx.(*ent.Tx) 也能写出利落且具备乐观锁特性的 UpdateWalletTx

架构的回顾:从源头到落盘的闭环

整个礼物打赏链路可以缩影为这几步防线:

  1. Redis 一线拦截 (Lua):拦截了绝大部分并发操作,验证余额,同时以 idempotencyKey 进行第一层去重。拦截无效请求后,内存中的余额保持高响应。
  2. RabbitMQ 缓存洪峰:承担了业务解耦的角色,吸收瞬间打向数据库的并发压力。这让数据库不被压散。
  3. 版本号防线与 UPSERT 兜底 (PostgreSQL):哪怕经历网络异常 MQ 重发,在落盘时不仅有以 idempotencyKey 主键通过 OnConflictColumns 处理 UPSERT,加上检查了历史钱包版本发现 result == 0 会直接报错等待上层兜底,这一切联手保证了底层数据绝无重扣、绝无重复产生流水。

为什么要抛弃分布式事务框架?

在这套防线跑通之后,有人问我:"为什么不直接上重量级的 Saga 或者 TCC 等分布式事务框架?"

原因很简单:分布式事务框架旨在解决跨微服务的强一致性问题,但在这个场景,“强一致”是不必要的,甚至是有害的。在礼物打赏这种高并发容差场景里,把 Redis 的原子性、RabbitMQ 的异步解耦、PostgreSQL 的乐观锁重写组合起来,运用“首层拦截+消息队列排排坐+兜底防重”,不仅能够顶住洪峰,其维护成本比引入一个全链路阻塞的分布式事务要低得多。

从一开始乱加行锁把库干爆,到借用 Lua 脚本在内存里硬抗并发,再到为了解耦硬生生抠出一个纯净的自定义 Tx 接口。折腾这一遭最大的感触是:工程里哪有什么高大上的银弹架构,真正能扛事儿的系统,不过是摸清了业务底线后,老老实实把合适的破烂组件塞进对应的防线里罢了。

评论区