diff --git a/weed/server/master_block_registry.go b/weed/server/master_block_registry.go index 0026f617f..b0590f2ec 100644 --- a/weed/server/master_block_registry.go +++ b/weed/server/master_block_registry.go @@ -325,6 +325,14 @@ func (r *BlockVolumeRegistry) UpdateFullHeartbeat(server string, infos []*master if entry.VolumeServer == server { // Server is the primary: check if primary path is reported. if _, found := reported[entry.Path]; !found { + // B-10: Do not delete entries with a coordinated expand in flight. + // The primary may have restarted mid-expand; deleting the entry + // would orphan the volume and strand the expand coordinator. + if entry.ExpandInProgress { + glog.Warningf("block registry: skipping stale-cleanup for %q (ExpandInProgress=true, server=%s)", + name, server) + continue + } delete(r.volumes, name) delete(names, name) // Also clean up replica entries from byServer. diff --git a/weed/server/master_grpc_server_block.go b/weed/server/master_grpc_server_block.go index 8bc4fc8de..b8f7a0c82 100644 --- a/weed/server/master_grpc_server_block.go +++ b/weed/server/master_grpc_server_block.go @@ -409,6 +409,15 @@ func (ms *MasterServer) ExpandBlockVolume(ctx context.Context, req *master_pb.Ex } }() + // B-09: Re-read entry after acquiring expand lock. Between the initial + // Lookup and AcquireExpandInflight, failover may have changed VolumeServer + // or Replicas. Using the stale snapshot would send PREPARE to dead nodes. + entry, ok = ms.blockRegistry.Lookup(req.Name) + if !ok { + expandClean = true + return nil, fmt.Errorf("block volume %q disappeared during expand", req.Name) + } + // Track prepared nodes for rollback. var prepared []string diff --git a/weed/server/master_grpc_server_block_test.go b/weed/server/master_grpc_server_block_test.go index 42d1d92e9..1d29191ee 100644 --- a/weed/server/master_grpc_server_block_test.go +++ b/weed/server/master_grpc_server_block_test.go @@ -1652,3 +1652,134 @@ func TestMaster_ExpandCoordinated_RestartRecovery(t *testing.T) { t.Fatalf("capacity: got %d", resp.CapacityBytes) } } + +func TestMaster_ExpandCoordinated_B09_ReReadsEntryAfterLock(t *testing.T) { + // B-09: If failover changes VolumeServer between initial Lookup and + // AcquireExpandInflight, the coordinator must use the fresh entry, + // not the stale one. Use RF=3 so promotion still leaves 1 replica + // and the coordinated path is taken. + ms := testMasterServerWithExpandMocks(t) + ms.blockRegistry.MarkBlockCapable("vs1:9333") + ms.blockRegistry.MarkBlockCapable("vs2:9333") + ms.blockRegistry.MarkBlockCapable("vs3:9333") + ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) { + return &blockAllocResult{ + Path: fmt.Sprintf("/data/%s.blk", name), + IQN: fmt.Sprintf("iqn.test:%s", name), + ISCSIAddr: server, + ReplicaDataAddr: server + ":4001", + ReplicaCtrlAddr: server + ":4002", + }, nil + } + + ms.CreateBlockVolume(context.Background(), &master_pb.CreateBlockVolumeRequest{ + Name: "b09-vol", SizeBytes: 1 << 30, ReplicaFactor: 3, + }) + + entry, _ := ms.blockRegistry.Lookup("b09-vol") + originalPrimary := entry.VolumeServer + + // Record which servers receive PREPARE to verify the fresh entry is used. + var preparedServers []string + ms.blockVSPrepareExpand = func(ctx context.Context, server string, name string, newSize, expandEpoch uint64) error { + preparedServers = append(preparedServers, server) + return nil + } + ms.blockVSCommitExpand = func(ctx context.Context, server string, name string, expandEpoch uint64) (uint64, error) { + return 2 << 30, nil + } + + // Simulate failover: promote best replica. With RF=3, one replica + // becomes primary and the other stays as replica → coordinated path. + ms.blockRegistry.PromoteBestReplica("b09-vol") + + entry, _ = ms.blockRegistry.Lookup("b09-vol") + newPrimary := entry.VolumeServer + if newPrimary == originalPrimary { + t.Fatal("promotion didn't change primary") + } + if len(entry.Replicas) == 0 { + t.Fatal("expected at least 1 replica after RF=3 promotion") + } + + // Expand should use the NEW primary (post-failover), not the old one. + resp, err := ms.ExpandBlockVolume(context.Background(), &master_pb.ExpandBlockVolumeRequest{ + Name: "b09-vol", NewSizeBytes: 2 << 30, + }) + if err != nil { + t.Fatalf("expand: %v", err) + } + if resp.CapacityBytes != 2<<30 { + t.Fatalf("capacity: got %d", resp.CapacityBytes) + } + + // First PREPARE should have gone to the new primary, not the old one. + if len(preparedServers) == 0 { + t.Fatal("no prepare calls recorded") + } + if preparedServers[0] != newPrimary { + t.Fatalf("PREPARE went to %q (stale), should go to %q (fresh primary)", + preparedServers[0], newPrimary) + } + // Verify old primary was NOT contacted. + for _, s := range preparedServers { + if s == originalPrimary { + t.Fatalf("PREPARE sent to old primary %q — stale entry used", originalPrimary) + } + } +} + +func TestMaster_ExpandCoordinated_B10_HeartbeatDoesNotDeleteDuringExpand(t *testing.T) { + // B-10: A full heartbeat from a restarted primary must not delete + // the registry entry while a coordinated expand is in progress. + ms := testMasterServerWithExpandMocks(t) + ms.blockRegistry.MarkBlockCapable("vs1:9333") + ms.blockRegistry.MarkBlockCapable("vs2:9333") + ms.blockVSAllocate = func(ctx context.Context, server string, name string, sizeBytes uint64, diskType string, durabilityMode string) (*blockAllocResult, error) { + return &blockAllocResult{ + Path: fmt.Sprintf("/data/%s.blk", name), + IQN: fmt.Sprintf("iqn.test:%s", name), + ISCSIAddr: server, + ReplicaDataAddr: server + ":4001", + ReplicaCtrlAddr: server + ":4002", + }, nil + } + + ms.CreateBlockVolume(context.Background(), &master_pb.CreateBlockVolumeRequest{ + Name: "b10-vol", SizeBytes: 1 << 30, + }) + + entry, _ := ms.blockRegistry.Lookup("b10-vol") + primaryServer := entry.VolumeServer + + // Simulate: coordinated expand is in flight (acquire the lock). + expandEpoch := uint64(42) + if !ms.blockRegistry.AcquireExpandInflight("b10-vol", 2<<30, expandEpoch) { + t.Fatal("failed to acquire expand inflight") + } + + // Now simulate primary VS restart: full heartbeat that does NOT report + // the volume (it hasn't loaded it yet). Without B-10 fix, this deletes + // the entry from the registry. + ms.blockRegistry.UpdateFullHeartbeat(primaryServer, []*master_pb.BlockVolumeInfoMessage{ + // Empty: primary restarted and hasn't loaded this volume yet. + }) + + // Entry must still exist — expand is in progress. + _, ok := ms.blockRegistry.Lookup("b10-vol") + if !ok { + t.Fatal("entry deleted during coordinated expand — B-10 not fixed") + } + + // Verify expand state is preserved. + entry, _ = ms.blockRegistry.Lookup("b10-vol") + if !entry.ExpandInProgress { + t.Fatal("ExpandInProgress should still be true") + } + if entry.ExpandEpoch != expandEpoch { + t.Fatalf("ExpandEpoch: got %d, want %d", entry.ExpandEpoch, expandEpoch) + } + + // Cleanup. + ms.blockRegistry.ReleaseExpandInflight("b10-vol") +}