live-interact-engine
项目的礼物打赏功能设计时,我遇到了一个每个后端都会经历的噩梦:高并发下的数据一致性问题。
这不是一个单一的技术难题,而是三个互相冲突的需求同时砸了过来。我花了差不多一周的时间,从最原始的想法,到最后的三层防御架构,摸着石头过了河。
三重困局:性能 vs 一致性 vs 可靠性
让我先铺垫一下背景。直播间有个"打赏礼物"功能,用户余额足够时,可以给主播送礼物。核心逻辑很简单:
用户余额 - 礼物价格 = 新余额
礼物库存 - 1 = 新库存
但在生产环境,这个"简单"的操作会同时面对三个地狱级的约束。
痛点 1:性能的噩梦 — 数据库悲观锁把 TPS 砸烂
我一开始的设计(很朴素):
// 用户点击赠送礼物
func GiftToUser(buyerID, targetID, giftID string, count int) error {
// 1. 查询礼物价格
gift := db.GetGift(giftID)
totalCost := gift.Price * count
// 2. 查询买家余额
buyer := db.GetUser(buyerID)
if buyer.Balance < totalCost {
return ErrInsufficientBalance
}
// 3. 扣减买家余额
db.UpdateBalance(buyerID, buyer.Balance - totalCost)
// 4. 扣减礼物库存
db.UpdateGiftStock(giftID, count)
// 5. 记录流水
db.LogTransaction(buyerID, targetID, giftID, count)
return nil
}
这段代码在并发压力下就是个炸弹。为什么?
想象两个并发请求来了,都是同一个用户,余额 100 元,各买 60 元的礼物:
请求 1:查余额(100)→ 检查通过 → 还没扣呢,上下文切换
请求 2:查余额(100)→ 检查通过 → 开始扣减
请求 2:扣成 40 元 → 成功
请求 1:继续执行 → 扣成 40 元 → 成功
最后结果:两笔共 120 元的消费,余额却只扣了 60 元。钱凭空蒸发了。
第一个想法:用数据库排队与乐观锁
实际上,我第一反应就是上 PostgreSQL 的悲观行锁
SELECT ... FOR UPDATE。但为了避免严重锁排队,我们一开始的真实实现采用了基于版本号的乐观锁,看起来像这样:
// 钱包数据库仓储实现(基于 Ent ORM)
type WalletRepository struct {
client *ent.Client
}
// UpdateWallet 使用乐观锁更新
func (r *WalletRepository) UpdateWallet(ctx context.Context, wallet *domain.Wallet) error {
// 这种更新在高并发下会导致大量版本冲突重试
result, err := r.client.Wallet.
Update().
Where(
entwallet.UserIDEQ(wallet.UserID),
entwallet.VersionNumberEQ(wallet.VersionNumber),
).
SetBalance(wallet.Balance).
SetVersionNumber(wallet.VersionNumber + 1).
Save(ctx)
if err != nil {
return err
}
if result == 0 {
return types.ErrVersionConflict
}
return nil
}
这个方案理论上能保证强一致性。但问题是——在瞬时热点流量下,直接写库会成为灾难。
无论是悲观排队还是像上面这样的乐观锁,当大量并发请求同时想要更新同一行数据(买家余额)时,都会导致大量的锁竞争或版本冲突重试。整个数据库的连接池可能会瞬间打满,直接拖垮整个服务。
那时候我就想:核心的扣费判定,不能直接写库,得提速。
第二个想法:纯 Redis 操作,但还是竞态条件
我想到了 Redis。把用户余额存在 Redis,速度肯定比数据库快 N 倍。但单个 Redis 命令虽然是原子的,多个命令之间仍然有竞态条件:
// 错误的做法:多个 Redis 命令无法保证原子性
func DeductBalanceIncorrect(ctx context.Context, userID string, amount int64) error {
// 第一步:查余额
balance, _ := redis.Get(ctx, "user:"+userID+":balance").Int64()
// 这里有并发窗口!
if balance < amount {
return ErrInsufficientBalance
}
// 第二步:扣余额
redis.Decrby(ctx, "user:"+userID+":balance", amount)
return nil
}
但这还是有问题啊!Redis 操作本身虽然是原子的(单个命令),但多个命令之间还是有间隙。
请求 1:GET balance(100)→ 检查通过 → 还没 DECRBY
请求 2:GET balance(100)→ 检查通过 → 也没 DECRBY
请求 2:DECRBY 扣 60 → 40
请求 1:DECRBY 扣 60 → 40
又翻车了。
那时候我开始意识到:我需要的不是快,而是"多步操作必须是一个不可分割的整体"。
突破口:Redis + Lua 脚本
后来我想到了 Lua 脚本。Redis 的 EVAL 命令可以执行一段 Lua 代码,而整个脚本在 Redis 中是作为单个原子操作执行的。
简单理解就是:其他客户端的命令会被阻塞,直到这段 Lua 脚本执行完。所以在 Lua 脚本里执行的多个 Redis 操作,从外部看就像是一个不可分割的操作。
为什么能保证原子性? Redis 是单线程模型。Lua 脚本在 Redis 中执行时,会持有 Redis 的执行权。其他命令都要排队等待。直到脚本执行完,下一个命令才能执行。这就保证了脚本内的所有操作要么全成功,要么全失败,不会出现中间状态。
在实战中,我把这个 Lua 脚本直接封装在了 Go 代码里,并且加入了基于 Idempotency Key (幂等键) 的防重机制。
Go 语言中调用 Lua 脚本
这也是我现在工程里真正跑着的代码:
// walletCache Redis 钱包缓存实现
type walletCache struct {
client *redis.Client
prefix string // 例如 "wallet:"
luaDeductScript string
luaIncrementScript string
}
// NewWalletCache 创建钱包缓存,返回实现 domain.WalletCache 接口的实例
func NewWalletCache(client *redis.Client) domain.WalletCache {
luaDeductScript := `
local key = KEYS[1]
local idempotencyKey = KEYS[2]
local amount = tonumber(ARGV[1])
-- 检查幂等性:如果这个 idempotency_key 已经处理过,直接返回失败
if redis.call('EXISTS', idempotencyKey) == 1 then
return {0, redis.call('GET', key) or 0} -- 幂等防重,不再处理
end
-- 获取当前余额
local balance = tonumber(redis.call('GET', key) or 0)
-- 检查余额是否足够
if balance < amount then
return {-1, balance} -- -1 表示余额不足
end
-- 原子扣款
balance = balance - amount
redis.call('SET', key, balance)
-- 记录幂等性(设置过期时间24小时,防止 hash 表无限增长)
redis.call('SETEX', idempotencyKey, 86400, '1')
return {1, balance} -- 1 表示扣款成功,返回新余额
`
return &walletCache{
client: client,
prefix: "wallet:",
luaDeductScript: luaDeductScript,
}
}
// DeductByLua 使用 Lua 脚本原子扣款
func (c *walletCache) DeductByLua(ctx context.Context, userID uuid.UUID, amount int64, idempotencyKey uuid.UUID) (int64, error) {
key := c.prefix + userID.String()
idempotencyKeyStr := "idempotency:" + idempotencyKey.String()
// 执行 Lua 脚本
result, err := c.client.Eval(ctx, c.luaDeductScript, []string{key, idempotencyKeyStr}, amount).Result()
if err != nil {
return 0, err
}
// 解析返回值(安全的类型转换)
results := result.([]interface{})
code := toInt64(results[0])
newBalance := toInt64(results[1])
switch code {
case 1:
return newBalance, nil // 扣款成功
case 0:
return newBalance, types.ErrInvalidAmount // 幂等防重,已处理过
case -1:
return newBalance, types.ErrInsufficientBalance // 余额不足
default:
return 0, fmt.Errorf("unknown error code: %d", code)
}
}
其实这里面有几处细节值得玩味:redis.call('GET', key)
查不到东西时返回的不是 nil 而是 false,必须通过 or 0
兜底强转;而且通过这一段脚本,我们一次性解决了并发查改和防重扣款两个大问题。
我们看看外层的 Service 是怎么组合这个能力的:
type WalletService struct {
walletRepo domain.WalletRepository
walletCache domain.WalletCache
walletFilter domain.WalletFilter
}
func NewWalletService(
walletRepo domain.WalletRepository,
walletCache domain.WalletCache,
walletFilter domain.WalletFilter,
) *WalletService {
return &WalletService{
walletRepo: walletRepo,
walletCache: walletCache,
walletFilter: walletFilter,
}
}
// DeductBalance 使用Redis Lua原子扣款
func (s *WalletService) DeductBalance(ctx context.Context, userID uuid.UUID, amount int64, idempotencyKey uuid.UUID) (int64, error) {
// 检查过滤器,如果未初始化则从 DB 加载
if !s.walletFilter.Exists(ctx, userID) {
wallet, err := s.walletRepo.GetWallet(ctx, userID)
// 存在err且不为types.ErrWalletNotFound
if err != nil && !errors.Is(err, types.ErrWalletNotFound) {
return 0, err
}
if errors.Is(err, types.ErrWalletNotFound) {
// 钱包不存在,创建钱包
newWallet := &domain.Wallet{
UserID: userID,
Balance: 0,
VersionNumber: 0,
}
if err := s.walletRepo.CreateWallet(ctx, newWallet); err != nil {
zap.L().Error("failed to create wallet for user",
zap.String("user_id", userID.String()),
zap.Error(err))
return 0, err
}
wallet = newWallet
}
balance := wallet.Balance
// 写入 Redis 缓存
if err := s.walletCache.SetBalance(ctx, userID, balance); err != nil {
return 0, err
}
// 标记为已初始化
if err := s.walletFilter.Add(ctx, userID); err != nil {
zap.L().Error("add wallet filter failed", zap.String("user_id", userID.String()), zap.Error(err))
}
}
return s.walletCache.DeductByLua(ctx, userID, amount, idempotencyKey)
}
这一层我们通过缓存做过滤器,利用 Lua 脚本保证高并发 + 原子扣减。但是打赏并不仅仅是"扣钱"这么简单,还要真实地记录到数据库产生流水。在高峰流量下同步写入数据库,TPS 又会被打回原形。
第二层:RabbitMQ 异步解耦 — 避免落盘拖累主流程
我们的策略是通过 MQ 将持久化解耦。业务服务在 Redis 扣费成功后,会发布一个
gift.send.success
的事件广播到 RabbitMQ。后续落盘和流水记录,全交给 Consumer 异步削峰处理。
数据库只需要老老实实当一个小水管:
// Start 启动消费者(阻塞)
func (c *Consumer) Start(ctx context.Context) error {
// 1. 声明并绑定礼物队列
_, err := c.channel.QueueDeclare(
QueueName,
true, // durable
false, // auto-delete
false, // exclusive
false, // no-wait
amqp.Table{
"x-dead-letter-exchange": "gift-dlx",
},
)
if err != nil {
zap.L().Error("failed to declare gift queue", zap.Error(err))
return err
}
// 绑定礼物队列到Exchange(仅订阅 gift.send.success 消息)
err = c.channel.QueueBind(
QueueName,
"gift.send.success",
ExchangeName,
false,
nil,
)
if err != nil {
zap.L().Error("failed to bind gift queue", zap.Error(err))
return err
}
// 2. 消费礼物发送事件
giftMsgs, err := c.channel.Consume(
QueueName, // 队列名
"", // consumer tag(空表示自动生成)
false, // auto-ack(必须 false,手动 ack)
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
zap.L().Error("failed to consume from gift queue", zap.Error(err))
return err
}
// 3. 消费消息
for {
select {
case <-ctx.Done():
return ctx.Err()
case msg := <-giftMsgs:
if err := c.HandleGiftSendSuccess(context.Background(), msg.Body); err != nil {
zap.L().Error("failed to handle gift send success", zap.Error(err))
// 拒绝消息,重新入队
msg.Nack(false, true)
continue
}
// 消息处理成功,确认 ack
msg.Ack(false)
}
}
}
削峰解耦很爽,但这里隐藏着一个深坑:RabbitMQ 的 At-least-once (至少投递一次) 保证。
如果 Consumer 处理完了还没来得及 ACK 进程就挂了,MQ 心跳超时后会重新投递。这也就意味着有些事件极有可能会遭遇二次“暴击”。要是我们粗暴写库,同一笔交易就会扣费两次产生两条流水。
第三层:数据库兜底 + 领域架构 — 乐观事务幂等消费
虽然我们在 Redis 中做了 EXISTS idempotencyKey
的首道过滤,但这仍不够。为了抵御重复投递和真正的竞态修改,落盘时还需要使用数据库版本号 (Version
Number) 防重。并且这绝不仅仅是跑句 SQL 就完事。
我当时就在琢磨:怎样不将庞大的底层 ORM 框架混在干净的写库逻辑中?其实核心就两点:
- 从数据库读取当前 Wallet(带版本号)
- 在同一事务里写入交易流水,不仅使用幂等键
IdempotencyKey防重,还利用带有版本号条件Where(UserIDEQ(...), VersionNumberEQ(...))更新钱包主表。
为此,我把更新操作隔离到一个专门的 Consumer 处理方法中:
// 领域层定义的事务接口(纯净,不依赖任何 ORM)
type Tx interface {
Commit() error
Rollback() error
}
// 消费端的处理逻辑(完全不知道 Ent、GORM 的存在)
type TransactionConsumer struct {
walletRepo domain.WalletRepository
walletTransactionRepo domain.WalletTransactionRepository
txFactory func(ctx context.Context) (domain.Tx, error) // 工厂函数注入
}
func (tc *TransactionConsumer) ConsumeMessage(ctx context.Context, event *events.GiftSendSuccessEvent) error {
tx, err := tc.txFactory(ctx)
if err != nil {
return err
}
defer tx.Rollback()
// 核心逻辑:UPDATE 钱包余额(使用乐观锁)
wallet, err := tc.walletRepo.GetWallet(ctx, event.UserID)
if err != nil {
return err
}
// 业务检查:确保余额一致(防止并发冲突)
err = wallet.Deduct(event.Amount)
if err != nil {
return err
}
// 在事务内更新钱包(带乐观锁验证)
// 如果版本不符,抛出 err 交给上层重试或记录
err = tc.walletRepo.UpdateWalletTx(ctx, tx, wallet)
if err != nil {
return err
}
// 记录流水(便于对账和审计)
txn := &domain.WalletTransaction{
IdempotencyKey: event.IdempotencyKey,
Type: domain.WalletTransactionTypeGiftSend,
PayerID: wallet.UserID,
Amount: event.Amount,
Status: "SUCCESS",
}
err = tc.walletTransactionRepo.SaveWalletTransactionTx(ctx, tx, txn)
if err != nil {
return err
}
return tx.Commit()
}
为什么要自定义 Tx 接口而不是用 Ent 或 GORM 的事务?
因为领域层不应该依赖具体的 ORM 框架。如果我用了
ent.Tx,那领域层就被 Ent 绑死了。将来如果要换 GORM 或其他框架,整个领域层都要改。
自定义接口让我可以灵活地注入不同的实现:
// 钱包数据库仓储实现
type WalletRepository struct {
client *ent.Client
}
func (r *WalletRepository) Tx(ctx context.Context) (domain.Tx, error) {
tx, err := r.client.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
// ent.Tx 天然实现了 Commit() 和 Rollback(),直接返回即可
return tx, nil
}
// UpdateWalletTx 在事务内更新钱包余额(使用乐观锁)
func (r *WalletRepository) UpdateWalletTx(ctx context.Context, tx domain.Tx, wallet *domain.Wallet) error {
entTx := tx.(*ent.Tx)
result, err := entTx.Wallet.
Update().
Where(
entwallet.UserIDEQ(wallet.UserID),
entwallet.VersionNumberEQ(wallet.VersionNumber),
).
SetBalance(wallet.Balance).
SetVersionNumber(wallet.VersionNumber + 1).
Save(ctx)
if err != nil {
return err
}
if result == 0 {
return types.ErrVersionConflict
}
return nil
}
// 注入到 Consumer
consumer := &TransactionConsumer{
walletRepo: walletRepo,
walletTransactionRepo: walletTransactionRepo,
txFactory: func(ctx context.Context) (domain.Tx, error) {
return walletRepo.Tx(ctx)
},
}
这样做的好处是:领域层再也不用混杂 ORM 代码,核心处理逻辑与底层数据源彻底解耦。而且借这个机会,我甚至抛弃了封装多余结构的逻辑,既然
ent.Tx 天然支持 Commit() 和 Rollback(),直接强制类型转换
entTx := tx.(*ent.Tx) 也能写出利落且具备乐观锁特性的
UpdateWalletTx。
架构的回顾:从源头到落盘的闭环
整个礼物打赏链路可以缩影为这几步防线:
- Redis 一线拦截 (Lua):拦截了绝大部分并发操作,验证余额,同时以
idempotencyKey进行第一层去重。拦截无效请求后,内存中的余额保持高响应。 - RabbitMQ 缓存洪峰:承担了业务解耦的角色,吸收瞬间打向数据库的并发压力。这让数据库不被压散。
- 版本号防线与 UPSERT 兜底 (PostgreSQL):哪怕经历网络异常 MQ 重发,在落盘时不仅有以
idempotencyKey主键通过OnConflictColumns处理 UPSERT,加上检查了历史钱包版本发现result == 0会直接报错等待上层兜底,这一切联手保证了底层数据绝无重扣、绝无重复产生流水。
为什么要抛弃分布式事务框架?
在这套防线跑通之后,有人问我:"为什么不直接上重量级的 Saga 或者 TCC 等分布式事务框架?"
原因很简单:分布式事务框架旨在解决跨微服务的强一致性问题,但在这个场景,“强一致”是不必要的,甚至是有害的。在礼物打赏这种高并发容差场景里,把 Redis 的原子性、RabbitMQ 的异步解耦、PostgreSQL 的乐观锁重写组合起来,运用“首层拦截+消息队列排排坐+兜底防重”,不仅能够顶住洪峰,其维护成本比引入一个全链路阻塞的分布式事务要低得多。
从一开始乱加行锁把库干爆,到借用 Lua 脚本在内存里硬抗并发,再到为了解耦硬生生抠出一个纯净的自定义
Tx
接口。折腾这一遭最大的感触是:工程里哪有什么高大上的银弹架构,真正能扛事儿的系统,不过是摸清了业务底线后,老老实实把合适的破烂组件塞进对应的防线里罢了。