Browse Source

fix: B-09 stale entry during expand, B-10 heartbeat deletes during expand

B-09: ExpandBlockVolume re-reads the registry entry after acquiring
the expand inflight lock. Previously it used the entry from the
initial Lookup, which could be stale if failover changed VolumeServer
or Replicas between Lookup and PREPARE.

B-10: UpdateFullHeartbeat stale-cleanup now skips entries with
ExpandInProgress=true. Previously a primary VS restart during
coordinated expand would delete the entry (path not in heartbeat),
orphaning the volume and stranding the expand coordinator.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feature/sw-block
Ping Qiu 2 days ago
parent
commit
67f6e73ca7
  1. 8
      weed/server/master_block_registry.go
  2. 9
      weed/server/master_grpc_server_block.go
  3. 131
      weed/server/master_grpc_server_block_test.go

8
weed/server/master_block_registry.go

@ -325,6 +325,14 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master
if entry.VolumeServer == server {
// Server is the primary: check if primary path is reported.
if _, found := reported[entry.Path]; !found {
// B-10: Do not delete entries with a coordinated expand in flight.
// The primary may have restarted mid-expand; deleting the entry
// would orphan the volume and strand the expand coordinator.
if entry.ExpandInProgress {
glog.Warningf("block registry: skipping stale-cleanup for %q (ExpandInProgress=true, server=%s)",
name, server)
continue
}
delete(r.volumes, name)
delete(names, name)
// Also clean up replica entries from byServer.

9
weed/server/master_grpc_server_block.go

@ -409,6 +409,15 @@ func (ms *MasterServer) ExpandBlockVolume(ctx context.Context, req *master_pb.Ex
}
}()
// B-09: Re-read entry after acquiring expand lock. Between the initial
// Lookup and AcquireExpandInflight, failover may have changed VolumeServer
// or Replicas. Using the stale snapshot would send PREPARE to dead nodes.
entry, ok = ms.blockRegistry.Lookup(req.Name)
if !ok {
expandClean = true
return nil, fmt.Errorf("block volume %q disappeared during expand", req.Name)
}
// Track prepared nodes for rollback.
var prepared []string

131
weed/server/master_grpc_server_block_test.go

@ -1652,3 +1652,134 @@ func TestMaster_ExpandCoordinated_RestartRecovery(t *testing.T) {
t.Fatalf("capacity: got %d", resp.CapacityBytes)
}
}
func TestMaster_ExpandCoordinated_B09_ReReadsEntryAfterLock(t *testing.T) {
// B-09: If failover changes VolumeServer between initial Lookup and
// AcquireExpandInflight, the coordinator must use the fresh entry,
// not the stale one. Use RF=3 so promotion still leaves 1 replica
// and the coordinated path is taken.
ms := testMasterServerWithExpandMocks(t)
ms.blockRegistry.MarkBlockCapable("vs1:9333")
ms.blockRegistry.MarkBlockCapable("vs2:9333")
ms.blockRegistry.MarkBlockCapable("vs3:9333")
ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) {
return &blockAllocResult{
Path: fmt.Sprintf("/data/%s.blk", name),
IQN: fmt.Sprintf("iqn.test:%s", name),
ISCSIAddr: server,
ReplicaDataAddr: server + ":4001",
ReplicaCtrlAddr: server + ":4002",
}, nil
}
ms.CreateBlockVolume(context.Background(), &master_pb.CreateBlockVolumeRequest{
Name: "b09-vol", SizeBytes: 1 << 30, ReplicaFactor: 3,
})
entry, _ := ms.blockRegistry.Lookup("b09-vol")
originalPrimary := entry.VolumeServer
// Record which servers receive PREPARE to verify the fresh entry is used.
var preparedServers []string
ms.blockVSPrepareExpand = func(ctx context.Context, server string, name string, newSize, expandEpoch uint64) error {
preparedServers = append(preparedServers, server)
return nil
}
ms.blockVSCommitExpand = func(ctx context.Context, server string, name string, expandEpoch uint64) (uint64, error) {
return 2 << 30, nil
}
// Simulate failover: promote best replica. With RF=3, one replica
// becomes primary and the other stays as replica → coordinated path.
ms.blockRegistry.PromoteBestReplica("b09-vol")
entry, _ = ms.blockRegistry.Lookup("b09-vol")
newPrimary := entry.VolumeServer
if newPrimary == originalPrimary {
t.Fatal("promotion didn't change primary")
}
if len(entry.Replicas) == 0 {
t.Fatal("expected at least 1 replica after RF=3 promotion")
}
// Expand should use the NEW primary (post-failover), not the old one.
resp, err := ms.ExpandBlockVolume(context.Background(), &master_pb.ExpandBlockVolumeRequest{
Name: "b09-vol", NewSizeBytes: 2 << 30,
})
if err != nil {
t.Fatalf("expand: %v", err)
}
if resp.CapacityBytes != 2<<30 {
t.Fatalf("capacity: got %d", resp.CapacityBytes)
}
// First PREPARE should have gone to the new primary, not the old one.
if len(preparedServers) == 0 {
t.Fatal("no prepare calls recorded")
}
if preparedServers[0] != newPrimary {
t.Fatalf("PREPARE went to %q (stale), should go to %q (fresh primary)",
preparedServers[0], newPrimary)
}
// Verify old primary was NOT contacted.
for _, s := range preparedServers {
if s == originalPrimary {
t.Fatalf("PREPARE sent to old primary %q — stale entry used", originalPrimary)
}
}
}
func TestMaster_ExpandCoordinated_B10_HeartbeatDoesNotDeleteDuringExpand(t *testing.T) {
// B-10: A full heartbeat from a restarted primary must not delete
// the registry entry while a coordinated expand is in progress.
ms := testMasterServerWithExpandMocks(t)
ms.blockRegistry.MarkBlockCapable("vs1:9333")
ms.blockRegistry.MarkBlockCapable("vs2:9333")
ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) {
return &blockAllocResult{
Path: fmt.Sprintf("/data/%s.blk", name),
IQN: fmt.Sprintf("iqn.test:%s", name),
ISCSIAddr: server,
ReplicaDataAddr: server + ":4001",
ReplicaCtrlAddr: server + ":4002",
}, nil
}
ms.CreateBlockVolume(context.Background(), &master_pb.CreateBlockVolumeRequest{
Name: "b10-vol", SizeBytes: 1 << 30,
})
entry, _ := ms.blockRegistry.Lookup("b10-vol")
primaryServer := entry.VolumeServer
// Simulate: coordinated expand is in flight (acquire the lock).
expandEpoch := uint64(42)
if !ms.blockRegistry.AcquireExpandInflight("b10-vol", 2<<30, expandEpoch) {
t.Fatal("failed to acquire expand inflight")
}
// Now simulate primary VS restart: full heartbeat that does NOT report
// the volume (it hasn't loaded it yet). Without B-10 fix, this deletes
// the entry from the registry.
ms.blockRegistry.UpdateFullHeartbeat(primaryServer, []*master_pb.BlockVolumeInfoMessage{
// Empty: primary restarted and hasn't loaded this volume yet.
})
// Entry must still exist — expand is in progress.
_, ok := ms.blockRegistry.Lookup("b10-vol")
if !ok {
t.Fatal("entry deleted during coordinated expand — B-10 not fixed")
}
// Verify expand state is preserved.
entry, _ = ms.blockRegistry.Lookup("b10-vol")
if !entry.ExpandInProgress {
t.Fatal("ExpandInProgress should still be true")
}
if entry.ExpandEpoch != expandEpoch {
t.Fatalf("ExpandEpoch: got %d, want %d", entry.ExpandEpoch, expandEpoch)
}
// Cleanup.
ms.blockRegistry.ReleaseExpandInflight("b10-vol")
}
Loading…
Cancel
Save