From 67f6e73ca7857e5a57ae81d462bc53857efaaf1c Mon Sep 17 00:00:00 2001 From: Ping Qiu Date: Thu, 12 Mar 2026 15:12:40 -0700 Subject: [PATCH] fix: B-09 stale entry during expand, B-10 heartbeat deletes during expand B-09: ExpandBlockVolume re-reads the registry entry after acquiring the expand inflight lock. Previously it used the entry from the initial Lookup, which could be stale if failover changed VolumeServer or Replicas between Lookup and PREPARE. B-10: UpdateFullHeartbeat stale-cleanup now skips entries with ExpandInProgress=true. Previously a primary VS restart during coordinated expand would delete the entry (path not in heartbeat), orphaning the volume and stranding the expand coordinator. Co-Authored-By: Claude Opus 4.6 --- weed/server/master_block_registry.go | 8 ++ weed/server/master_grpc_server_block.go | 9 ++ weed/server/master_grpc_server_block_test.go | 131 +++++++++++++++++++ 3 files changed, 148 insertions(+) diff --git a/weed/server/master_block_registry.go b/weed/server/master_block_registry.go index 0026f617f..b0590f2ec 100644 --- a/weed/server/master_block_registry.go +++ b/weed/server/master_block_registry.go @@ -325,6 +325,14 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master if entry.VolumeServer == server { // Server is the primary: check if primary path is reported. if _, found := reported[entry.Path]; !found { + // B-10: Do not delete entries with a coordinated expand in flight. + // The primary may have restarted mid-expand; deleting the entry + // would orphan the volume and strand the expand coordinator. + if entry.ExpandInProgress { + glog.Warningf("block registry: skipping stale-cleanup for %q (ExpandInProgress=true, server=%s)", + name, server) + continue + } delete(r.volumes, name) delete(names, name) // Also clean up replica entries from byServer. diff --git a/weed/server/master_grpc_server_block.go b/weed/server/master_grpc_server_block.go index 8bc4fc8de..b8f7a0c82 100644 --- a/weed/server/master_grpc_server_block.go +++ b/weed/server/master_grpc_server_block.go @@ -409,6 +409,15 @@ func (ms *MasterServer) ExpandBlockVolume(ctx context.Context, req *master_pb.Ex } }() + // B-09: Re-read entry after acquiring expand lock. Between the initial + // Lookup and AcquireExpandInflight, failover may have changed VolumeServer + // or Replicas. Using the stale snapshot would send PREPARE to dead nodes. + entry, ok = ms.blockRegistry.Lookup(req.Name) + if !ok { + expandClean = true + return nil, fmt.Errorf("block volume %q disappeared during expand", req.Name) + } + // Track prepared nodes for rollback. var prepared []string diff --git a/weed/server/master_grpc_server_block_test.go b/weed/server/master_grpc_server_block_test.go index 42d1d92e9..1d29191ee 100644 --- a/weed/server/master_grpc_server_block_test.go +++ b/weed/server/master_grpc_server_block_test.go @@ -1652,3 +1652,134 @@ func TestMaster_ExpandCoordinated_RestartRecovery(t *testing.T) { t.Fatalf("capacity: got %d", resp.CapacityBytes) } } + +func TestMaster_ExpandCoordinated_B09_ReReadsEntryAfterLock(t *testing.T) { + // B-09: If failover changes VolumeServer between initial Lookup and + // AcquireExpandInflight, the coordinator must use the fresh entry, + // not the stale one. Use RF=3 so promotion still leaves 1 replica + // and the coordinated path is taken. + ms := testMasterServerWithExpandMocks(t) + ms.blockRegistry.MarkBlockCapable("vs1:9333") + ms.blockRegistry.MarkBlockCapable("vs2:9333") + ms.blockRegistry.MarkBlockCapable("vs3:9333") + ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) { + return &blockAllocResult{ + Path: fmt.Sprintf("/data/%s.blk", name), + IQN: fmt.Sprintf("iqn.test:%s", name), + ISCSIAddr: server, + ReplicaDataAddr: server + ":4001", + ReplicaCtrlAddr: server + ":4002", + }, nil + } + + ms.CreateBlockVolume(context.Background(), &master_pb.CreateBlockVolumeRequest{ + Name: "b09-vol", SizeBytes: 1 << 30, ReplicaFactor: 3, + }) + + entry, _ := ms.blockRegistry.Lookup("b09-vol") + originalPrimary := entry.VolumeServer + + // Record which servers receive PREPARE to verify the fresh entry is used. + var preparedServers []string + ms.blockVSPrepareExpand = func(ctx context.Context, server string, name string, newSize, expandEpoch uint64) error { + preparedServers = append(preparedServers, server) + return nil + } + ms.blockVSCommitExpand = func(ctx context.Context, server string, name string, expandEpoch uint64) (uint64, error) { + return 2 << 30, nil + } + + // Simulate failover: promote best replica. With RF=3, one replica + // becomes primary and the other stays as replica → coordinated path. + ms.blockRegistry.PromoteBestReplica("b09-vol") + + entry, _ = ms.blockRegistry.Lookup("b09-vol") + newPrimary := entry.VolumeServer + if newPrimary == originalPrimary { + t.Fatal("promotion didn't change primary") + } + if len(entry.Replicas) == 0 { + t.Fatal("expected at least 1 replica after RF=3 promotion") + } + + // Expand should use the NEW primary (post-failover), not the old one. + resp, err := ms.ExpandBlockVolume(context.Background(), &master_pb.ExpandBlockVolumeRequest{ + Name: "b09-vol", NewSizeBytes: 2 << 30, + }) + if err != nil { + t.Fatalf("expand: %v", err) + } + if resp.CapacityBytes != 2<<30 { + t.Fatalf("capacity: got %d", resp.CapacityBytes) + } + + // First PREPARE should have gone to the new primary, not the old one. + if len(preparedServers) == 0 { + t.Fatal("no prepare calls recorded") + } + if preparedServers[0] != newPrimary { + t.Fatalf("PREPARE went to %q (stale), should go to %q (fresh primary)", + preparedServers[0], newPrimary) + } + // Verify old primary was NOT contacted. + for _, s := range preparedServers { + if s == originalPrimary { + t.Fatalf("PREPARE sent to old primary %q — stale entry used", originalPrimary) + } + } +} + +func TestMaster_ExpandCoordinated_B10_HeartbeatDoesNotDeleteDuringExpand(t *testing.T) { + // B-10: A full heartbeat from a restarted primary must not delete + // the registry entry while a coordinated expand is in progress. + ms := testMasterServerWithExpandMocks(t) + ms.blockRegistry.MarkBlockCapable("vs1:9333") + ms.blockRegistry.MarkBlockCapable("vs2:9333") + ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) { + return &blockAllocResult{ + Path: fmt.Sprintf("/data/%s.blk", name), + IQN: fmt.Sprintf("iqn.test:%s", name), + ISCSIAddr: server, + ReplicaDataAddr: server + ":4001", + ReplicaCtrlAddr: server + ":4002", + }, nil + } + + ms.CreateBlockVolume(context.Background(), &master_pb.CreateBlockVolumeRequest{ + Name: "b10-vol", SizeBytes: 1 << 30, + }) + + entry, _ := ms.blockRegistry.Lookup("b10-vol") + primaryServer := entry.VolumeServer + + // Simulate: coordinated expand is in flight (acquire the lock). + expandEpoch := uint64(42) + if !ms.blockRegistry.AcquireExpandInflight("b10-vol", 2<<30, expandEpoch) { + t.Fatal("failed to acquire expand inflight") + } + + // Now simulate primary VS restart: full heartbeat that does NOT report + // the volume (it hasn't loaded it yet). Without B-10 fix, this deletes + // the entry from the registry. + ms.blockRegistry.UpdateFullHeartbeat(primaryServer, []*master_pb.BlockVolumeInfoMessage{ + // Empty: primary restarted and hasn't loaded this volume yet. + }) + + // Entry must still exist — expand is in progress. + _, ok := ms.blockRegistry.Lookup("b10-vol") + if !ok { + t.Fatal("entry deleted during coordinated expand — B-10 not fixed") + } + + // Verify expand state is preserved. + entry, _ = ms.blockRegistry.Lookup("b10-vol") + if !entry.ExpandInProgress { + t.Fatal("ExpandInProgress should still be true") + } + if entry.ExpandEpoch != expandEpoch { + t.Fatalf("ExpandEpoch: got %d, want %d", entry.ExpandEpoch, expandEpoch) + } + + // Cleanup. + ms.blockRegistry.ReleaseExpandInflight("b10-vol") +}