Browse Source

fix: restart reconciliation — trust roles, upsert replicas

Same-epoch reconciliation now trusts reported roles first:
- one claims primary, other replica → trust roles
- both claim primary → WALHeadLSN heuristic tiebreak
- both claim replica → keep existing, log ambiguity

Replaced addServerAsReplica with upsertServerAsReplica: checks
for existing replica entry by server name before appending.
Prevents duplicate ReplicaInfo rows during restart/replay windows.

2 new tests: role-trusted same-epoch, duplicate replica prevention.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
Ping Qiu 1 week ago
parent
commit
c263d082b5
  1. 64
      weed/server/master_block_registry.go
  2. 54
      weed/server/master_block_registry_test.go

64
weed/server/master_block_registry.go

@ -575,19 +575,46 @@ func (r *BlockVolumeRegistry) reconcileOnRestart(name string, existing *BlockVol
// New server has a lower epoch — add as replica.
glog.V(0).Infof("block registry: reconcile %q: new server %s epoch %d < existing %s epoch %d, adding as replica",
name, newServer, newEpoch, existing.VolumeServer, oldEpoch)
r.addServerAsReplica(name, existing, newServer, info)
r.upsertServerAsReplica(name, existing, newServer, info)
return
}
// Same epoch — ambiguous. Use WALHeadLSN as heuristic tiebreaker.
if info.WalHeadLsn > existing.WALHeadLSN {
glog.Warningf("block registry: reconcile %q: AMBIGUOUS same epoch %d — new server %s LSN %d > existing %s LSN %d, promoting new (heuristic)",
name, newEpoch, newServer, info.WalHeadLsn, existing.VolumeServer, existing.WALHeadLSN)
// Same epoch — trust reported roles first.
existingIsPrimary := existing.Role == blockvol.RoleToWire(blockvol.RolePrimary)
newIsPrimary := info.Role == blockvol.RoleToWire(blockvol.RolePrimary)
if existingIsPrimary && !newIsPrimary {
// Existing claims primary, new claims replica — trust roles.
glog.V(0).Infof("block registry: reconcile %q: same epoch %d, existing %s is primary, new %s is replica — keeping existing",
name, newEpoch, existing.VolumeServer, newServer)
r.upsertServerAsReplica(name, existing, newServer, info)
return
}
if !existingIsPrimary && newIsPrimary {
// New claims primary, existing is not — trust roles.
glog.V(0).Infof("block registry: reconcile %q: same epoch %d, new %s claims primary, existing %s does not — promoting new",
name, newEpoch, newServer, existing.VolumeServer)
r.demoteExistingToReplica(name, existing, newServer, info)
return
}
// Both claim primary or both claim replica — ambiguous. Use WALHeadLSN as heuristic.
if newIsPrimary {
// Both claim primary.
if info.WalHeadLsn > existing.WALHeadLSN {
glog.Warningf("block registry: reconcile %q: AMBIGUOUS same epoch %d, both primary — new %s LSN %d > existing %s LSN %d, promoting new (heuristic)",
name, newEpoch, newServer, info.WalHeadLsn, existing.VolumeServer, existing.WALHeadLSN)
r.demoteExistingToReplica(name, existing, newServer, info)
} else {
glog.Warningf("block registry: reconcile %q: AMBIGUOUS same epoch %d, both primary — existing %s LSN %d >= new %s LSN %d, keeping existing (heuristic)",
name, newEpoch, existing.VolumeServer, existing.WALHeadLSN, newServer, info.WalHeadLsn)
r.upsertServerAsReplica(name, existing, newServer, info)
}
} else {
glog.Warningf("block registry: reconcile %q: AMBIGUOUS same epoch %d — existing %s LSN %d >= new server %s LSN %d, keeping existing (heuristic)",
name, newEpoch, existing.VolumeServer, existing.WALHeadLSN, newServer, info.WalHeadLsn)
r.addServerAsReplica(name, existing, newServer, info)
// Both claim replica — no primary known. Keep existing, log ambiguity.
glog.Warningf("block registry: reconcile %q: AMBIGUOUS same epoch %d, neither claims primary — keeping existing %s, adding new %s as replica",
name, newEpoch, existing.VolumeServer, newServer)
r.upsertServerAsReplica(name, existing, newServer, info)
}
}
@ -637,9 +664,26 @@ func (r *BlockVolumeRegistry) demoteExistingToReplica(name string, existing *Blo
}
}
// addServerAsReplica adds the new server as a replica for the existing entry.
// upsertServerAsReplica adds or updates the server as a replica for the existing entry.
// If the server already exists in Replicas[], its fields are updated instead of appending
// a duplicate. This prevents duplicate replica entries during restart/replay windows.
// Caller must hold r.mu.
func (r *BlockVolumeRegistry) addServerAsReplica(name string, existing *BlockVolumeEntry, newServer string, info *master_pb.BlockVolumeInfoMessage) {
func (r *BlockVolumeRegistry) upsertServerAsReplica(name string, existing *BlockVolumeEntry, newServer string, info *master_pb.BlockVolumeInfoMessage) {
// Check for existing replica entry for this server.
for i := range existing.Replicas {
if existing.Replicas[i].Server == newServer {
// Update in place.
existing.Replicas[i].Path = info.Path
existing.Replicas[i].HealthScore = info.HealthScore
existing.Replicas[i].WALHeadLSN = info.WalHeadLsn
existing.Replicas[i].LastHeartbeat = time.Now()
existing.Replicas[i].Role = info.Role
existing.Replicas[i].NvmeAddr = info.NvmeAddr
existing.Replicas[i].NQN = info.Nqn
return
}
}
// New replica — append.
ri := ReplicaInfo{
Server: newServer,
Path: info.Path,

54
weed/server/master_block_registry_test.go

@ -1748,6 +1748,60 @@ func TestMasterRestart_ReplicaHeartbeat_AddedCorrectly(t *testing.T) {
}
}
func TestMasterRestart_SameEpoch_RoleTrusted(t *testing.T) {
r := NewBlockVolumeRegistry()
// First heartbeat: vs1 claims primary, epoch 5, LSN 50.
r.UpdateFullHeartbeat("vs1:9333", []*master_pb.BlockVolumeInfoMessage{
{Path: "/data/vol1.blk", Epoch: 5, Role: 1, WalHeadLsn: 50, VolumeSize: 1 << 30},
}, "")
// Second heartbeat: vs2 claims replica (Role=2), same epoch 5, HIGHER LSN.
// Even though LSN is higher, it reports Role=replica, so it should NOT become primary.
r.UpdateFullHeartbeat("vs2:9333", []*master_pb.BlockVolumeInfoMessage{
{Path: "/data/vol1.blk", Epoch: 5, Role: 2, WalHeadLsn: 200, VolumeSize: 1 << 30},
}, "")
entry, _ := r.Lookup("vol1")
if entry.VolumeServer != "vs1:9333" {
t.Fatalf("expected vs1 (claims primary) to stay primary, got %q", entry.VolumeServer)
}
if len(entry.Replicas) != 1 || entry.Replicas[0].Server != "vs2:9333" {
t.Fatalf("expected vs2 as replica, got %+v", entry.Replicas)
}
}
func TestMasterRestart_DuplicateReplicaHeartbeat_NoDuplicate(t *testing.T) {
r := NewBlockVolumeRegistry()
// Primary heartbeat.
r.UpdateFullHeartbeat("vs1:9333", []*master_pb.BlockVolumeInfoMessage{
{Path: "/data/vol1.blk", Epoch: 5, Role: 1, WalHeadLsn: 100, VolumeSize: 1 << 30},
}, "")
// Replica heartbeat — first time.
r.UpdateFullHeartbeat("vs2:9333", []*master_pb.BlockVolumeInfoMessage{
{Path: "/data/vol1.blk", Epoch: 5, Role: 2, WalHeadLsn: 90, VolumeSize: 1 << 30, HealthScore: 0.8},
}, "")
// Same replica heartbeat again — should update, not duplicate.
r.UpdateFullHeartbeat("vs2:9333", []*master_pb.BlockVolumeInfoMessage{
{Path: "/data/vol1.blk", Epoch: 5, Role: 2, WalHeadLsn: 95, VolumeSize: 1 << 30, HealthScore: 0.9},
}, "")
entry, _ := r.Lookup("vol1")
if len(entry.Replicas) != 1 {
t.Fatalf("expected 1 replica (no duplicates), got %d", len(entry.Replicas))
}
// Should have the updated values from the second heartbeat.
if entry.Replicas[0].WALHeadLSN != 95 {
t.Fatalf("expected updated LSN 95, got %d", entry.Replicas[0].WALHeadLSN)
}
if entry.Replicas[0].HealthScore != 0.9 {
t.Fatalf("expected updated health 0.9, got %f", entry.Replicas[0].HealthScore)
}
}
// --- Copy semantics tests (pointer escape fix) ---
func TestLookup_ReturnsCopy(t *testing.T) {

Loading…
Cancel
Save