帧同步房间 — Room / InputBuf / HashAgg / AdaptiveN
基于 rts-server-golang lockstep/ 模块解析 关键词: 帧同步, Room actor, InputBuf, HashAgg, AdaptiveN, Desync检测, 断线重连
概述
帧同步房间(Room)是游戏的逻辑核心,负责按固定帧率推进游戏、收集广播玩家输入、检测 desync、处理重连。
帧同步的核心思想:服务端只转发输入,不计算结果。客户端收到所有玩家的输入后,本地执行模拟。因为大家跑同一份代码、同一组输入、同一颗 seed,结果必然一致。
为什么用帧同步
| 方案 | 服务端 | 客户端 | 适用 |
|---|---|---|---|
| 状态同步 | 保存完整世界状态 | 只展示 | MMORPG、卡牌 |
| 帧同步 | 只转发输入 | 本地计算 | MOBA、FPS、RTS |
帧同步的优势:服务端负载极低、延迟低、断线重连快。
Room 核心架构
每个 Room 一个 goroutine,没有锁
│
├─ Inbox chan RoomMsg ← 接收玩家消息
├─ InputBuf ← 按 tick 整理输入
├─ HashAgg ← 收集 hash 检测 desync
├─ AdaptiveN ← 动态调整输入延迟
├─ sim.World ← 确定性模拟(服务端也跑一遍验证)
└─ FrameHistory [] ← 环 buffer,保留最近 200 帧Tick 循环
go
func (r *Room) Run(ctx context.Context) {
startTime := time.Now()
for {
// Tick N 的目标绝对时间
nextTick := startTime.Add(time.Duration(r.tick+1) * r.tickInterval)
sleepDur := time.Until(nextTick)
if sleepDur > 0 {
timer := time.NewTimer(sleepDur)
select {
case <-ctx.Done(): return // 房间关闭
case msg := <-r.Inbox: // 收到玩家消息
timer.Stop()
r.handleMsg(msg)
r.drainInbox() // 抽干 Inbox 里所有消息
continue // 继续睡,等下一个 tick
case <-timer.C: // 时间到了
}
}
// 时间到(或已落后):抽干消息 → 封帧广播
r.drainInbox()
r.sealTick()
}
}关键:Tick 定时是绝对的,不是相对的。
Tick N 的目标时间 = startTime + N * tickInterval
Tick 1 = startTime + 33ms
Tick 2 = startTime + 66ms (30 FPS)
Tick 3 = startTime + 99ms如果 Tick 3 的目标时间已到但 Tick 2 还没处理完,说明落后了,立即 tick 不等待。
InputBuf — 每帧输入收集
为什么需要
帧同步要求同一帧内所有玩家的命令同时执行:
玩家A在 tick=50 发: Move Unit 1 to (10, 20)
玩家B在 tick=50 发: Attack Unit 3 with Unit 1
服务器 tick=50: A 和 B 的命令同时执行
客户端A: A 和 B 的命令同时执行
客户端B: A 和 B 的命令同时执行
→ 三方结果完全一致实现
go
type InputBuf struct {
buckets map[uint32][]wire.Cmd // tick → 该帧所有玩家的命令
}
// 收到玩家命令
func (ib *InputBuf) Add(cmd wire.Cmd) {
ib.buckets[cmd.Tick] = append(ib.buckets[cmd.Tick], cmd)
}
// 封帧:取出该 tick 所有命令,并清空
func (ib *InputBuf) Seal(tick uint32) []wire.Cmd {
cmds := ib.buckets[tick]
delete(ib.buckets, tick)
return cmds
}封帧时机
go
func (r *Room) sealTick() {
r.tick++ // tick=50 → tick=51
cmds := r.inputBuf.Seal(r.tick) // 取出 tick=51 的所有命令
// → 所有人同时执行 tick=51 的命令
}Seal 后清空该 tick 的 bucket,不可逆。
AdaptiveN — 动态输入延迟
为什么需要
所有玩家必须同时执行同一帧,但:
- 玩家网络不同(RTT 20ms vs 200ms)
- 服务器广播有延迟
- 玩家收到 FrameBundle 有先后
如果等最慢的玩家,会卡住所有人。
解决:输入延迟 N 帧
客户端在 tick=50 时:
→ 本地执行 tick=50 - N 的逻辑(提前算)
→ 同时发送 tick=50 + N 的命令(延迟发)服务器收到命令时,已经是 tick=50+N 了,直接 Seal 就刚好对齐。
公式
N = ceil(maxRTT / tickInterval) + 1
tickInterval = 1000ms / 30 = 33ms
maxRTT = 100ms
N = ceil(100/33) + 1 = 4 + 1 = 5 帧每帧所有客户端都有 5 帧的缓冲时间,收到慢的也来得及。
实现
go
func (a *AdaptiveN) Recalculate() (uint8, bool) {
// 找所有玩家中 RTT 最大的
var maxRTT time.Duration
for _, rtt := range a.rttSamples {
if rtt > maxRTT { maxRTT = rtt }
}
tickInterval := time.Second / time.Duration(a.tickRate)
needed := int(maxRTT/tickInterval) + 1 + 1 // +1 ceil, +1 安全边际
// 限制在 [minN, maxN] 范围内
if needed < int(a.minN) { needed = int(a.minN) }
if needed > int(a.maxN) { needed = int(a.maxN) }
newN := uint8(needed)
if newN == a.current { return a.current, false }
a.current = newN
return newN, true
}N 变化通知
go
// sealTick 中每 20 ticks 检查一次
if newN, changed := r.adaptiveN.Recalculate(); changed {
npub := &wire.NPub{
EffectiveFromTick: r.tick + uint32(newN)*2, // 延迟生效
N: newN,
}
broadcast NPub
}EffectiveFromTick 是为了让 N 变化平滑落地——不是立刻生效,而是等两端都就绪。
HashAgg — Desync 检测
为什么需要
帧同步最大风险:各端计算结果不一致
原因:
- 随机数不同步 (服务端和客户端 seed 不同)
- 浮点精度不同 (Go float64 vs JS float64)
- 计算顺序不确定 (A 的命令先执行还是 B 的先执行)原理
每帧执行完后,每个客户端计算 hash(world) 汇报给服务器:
- 所有玩家的 hash 一致 → 没 desync
- 有人 hash 不一样 → desync
实现
go
type tickHash struct {
tick uint32
hashes map[uint8]uint64 // playerID → hash
expected int // 期望收到多少个
state HashState // Pending/Partial/Match/Desync/Timeout
}状态机
CreateSlot(tick=50):
tickHash{state: HashPending, hashes: {}, expected: 2}
收到玩家A的 HashAck(tick=50, hash=A的hash):
state = HashPartial (还没收齐)
收到玩家B的 HashAck(tick=50, hash=B的hash):
比对: A的hash == B的hash?
是 → state = HashMatch
否 → state = HashDesync ← 报警!滑动窗口
只保留最近 100 个 tick 的 hash 槽,防止内存无限增长:
go
if tick > uint32(h.windowSize) {
cutoff := tick - uint32(h.windowSize)
for t := range h.window {
if t < cutoff { delete(h.window, t) }
}
}完整 Tick 生命周期
Tick N 的完整流程:
1. drainInbox()
└─ 处理所有排队的 MsgCmd → inputBuf.Add(cmd)
└─ 处理 MsgHashAck → hashAgg.Report
└─ 处理 MsgRTTReport → adaptiveN.ReportRTT
2. sealTick()
├─ r.tick++
├─ cmds = inputBuf.Seal(tick) ← 取出该帧所有命令
├─ fb = wire.FrameBundle{Tick, N, Cmds}
├─ r.hashAgg.CreateSlot(tick) ← 预建 hash 槽
├─ r.hashAgg.CheckTimeouts() ← 清理超时的 hash
├─ broadcast FrameBundle ← 发给所有玩家
├─ 每20 ticks: recalculate N ← 调参
├─ sim.Step(world, cmds) ← 服务端也跑一遍(验证用)
└─ replayWriter.WriteTick() ← 录制
客户端收到 FrameBundle 后:
1. 缓存帧 (buffer[N] = frame)
2. 执行 tick = N - N 的本地逻辑
3. 执行完发送 HashAck(tick=N, hash=worldHash)断线重连
三个状态
Connected: 正常游戏
Disconnected: 网络断开,slot 进 limbo(保留 30 秒)
Reconnecting: 重连成功,恢复游戏Limbo 机制
go
func (r *Room) HandleDisconnect(playerID uint8) {
// slot 不删除,只清空 Conn
r.limbo = append(r.limbo, limboSlot{
playerID: playerID,
disconnAt: time.Now(),
})
}追赶策略
go
behind := int(r.tick - resume.LastExecutedTick)
if behind > 100 || behind < 0 {
// 落后太多:发全量快照
resync.Snapshot = sim.Marshal(r.world) // 整个世界序列化
} else {
// 落后不多:发增量帧
resync.Frames = r.FrameHistory(lastExec)
}FrameHistory 从环形 buffer 里取 [lastExec+1, currentTick] 的所有帧。
相关
- 项目源码: rts-server-golang
/internal/lockstep/ - 上篇: [[可靠UDP传输层]]
- 下篇: [[确定性模拟]]