diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index d00cb5df4..3f2f8f9d4 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -59,8 +59,8 @@ func (ms *MasterServer) ProcessGrowRequest() { continue } dcs := ms.Topo.ListDCAndRacks() - var err error for _, vlc := range ms.Topo.ListVolumeLayoutCollections() { + var err error vl := vlc.VolumeLayout lastGrowCount := vl.GetLastGrowCount() if vl.HasGrowRequest() { @@ -74,8 +74,14 @@ func (ms *MasterServer) ProcessGrowRequest() { switch { case mustGrow > 0: - vgr.WritableVolumeCount = uint32(mustGrow) - _, err = ms.VolumeGrow(ctx, vgr) + if rp, rpErr := super_block.NewReplicaPlacementFromString(vgr.Replication); rpErr != nil { + glog.V(0).Infof("failed to parse replica placement %s: %v", vgr.Replication, rpErr) + } else { + vgr.WritableVolumeCount = uint32(mustGrow) + if ms.Topo.AvailableSpaceFor(&topology.VolumeGrowOption{DiskType: types.ToDiskType(vgr.DiskType)}) >= int64(vgr.WritableVolumeCount*uint32(rp.GetCopyCount())) { + _, err = ms.VolumeGrow(ctx, vgr) + } + } case lastGrowCount > 0 && writable < int(lastGrowCount*2) && float64(crowded+volumeGrowStepCount) > float64(writable)*topology.VolumeGrowStrategy.Threshold: vgr.WritableVolumeCount = volumeGrowStepCount _, err = ms.VolumeGrow(ctx, vgr) diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index 40950b3be..6b1886b44 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -20,26 +20,46 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI balanceConfig := config.(*Config) - // Skip if cluster is too small + // Group volumes by disk type to ensure we compare apples to apples + volumesByDiskType := make(map[string][]*types.VolumeHealthMetrics) + for _, metric := range metrics { + volumesByDiskType[metric.DiskType] = append(volumesByDiskType[metric.DiskType], metric) + } + + var allParams []*types.TaskDetectionResult + + for diskType, diskMetrics := range volumesByDiskType { + if task := detectForDiskType(diskType, diskMetrics, balanceConfig, clusterInfo); task != nil { + allParams = append(allParams, task) + } + } + + return allParams, nil +} + +// detectForDiskType performs balance detection for a specific disk type +func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics, balanceConfig *Config, clusterInfo *types.ClusterInfo) *types.TaskDetectionResult { + // Skip if cluster segment is too small minVolumeCount := 2 // More reasonable for small clusters - if len(metrics) < minVolumeCount { - glog.Infof("BALANCE: No tasks created - cluster too small (%d volumes, need ≥%d)", len(metrics), minVolumeCount) - return nil, nil + if len(diskMetrics) < minVolumeCount { + // Only log at verbose level to avoid spamming for small/empty disk types + glog.V(1).Infof("BALANCE [%s]: No tasks created - cluster too small (%d volumes, need ≥%d)", diskType, len(diskMetrics), minVolumeCount) + return nil } // Analyze volume distribution across servers serverVolumeCounts := make(map[string]int) - for _, metric := range metrics { + for _, metric := range diskMetrics { serverVolumeCounts[metric.Server]++ } if len(serverVolumeCounts) < balanceConfig.MinServerCount { - glog.Infof("BALANCE: No tasks created - too few servers (%d servers, need ≥%d)", len(serverVolumeCounts), balanceConfig.MinServerCount) - return nil, nil + glog.V(1).Infof("BALANCE [%s]: No tasks created - too few servers (%d servers, need ≥%d)", diskType, len(serverVolumeCounts), balanceConfig.MinServerCount) + return nil } // Calculate balance metrics - totalVolumes := len(metrics) + totalVolumes := len(diskMetrics) avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts)) maxVolumes := 0 @@ -61,14 +81,14 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI // Check if imbalance exceeds threshold imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer if imbalanceRatio <= balanceConfig.ImbalanceThreshold { - glog.Infof("BALANCE: No tasks created - cluster well balanced. Imbalance=%.1f%% (threshold=%.1f%%). Max=%d volumes on %s, Min=%d on %s, Avg=%.1f", - imbalanceRatio*100, balanceConfig.ImbalanceThreshold*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) - return nil, nil + glog.Infof("BALANCE [%s]: No tasks created - cluster well balanced. Imbalance=%.1f%% (threshold=%.1f%%). Max=%d volumes on %s, Min=%d on %s, Avg=%.1f", + diskType, imbalanceRatio*100, balanceConfig.ImbalanceThreshold*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) + return nil } // Select a volume from the overloaded server for balance var selectedVolume *types.VolumeHealthMetrics - for _, metric := range metrics { + for _, metric := range diskMetrics { if metric.Server == maxServer { selectedVolume = metric break @@ -76,13 +96,13 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI } if selectedVolume == nil { - glog.Warningf("BALANCE: Could not find volume on overloaded server %s", maxServer) - return nil, nil + glog.Warningf("BALANCE [%s]: Could not find volume on overloaded server %s", diskType, maxServer) + return nil } // Create balance task with volume and destination planning info - reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)", - imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) + reason := fmt.Sprintf("Cluster imbalance detected for %s: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)", + diskType, imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) // Generate task ID for ActiveTopology integration taskID := fmt.Sprintf("balance_vol_%d_%d", selectedVolume.VolumeID, time.Now().Unix()) @@ -102,21 +122,22 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI if clusterInfo.ActiveTopology != nil { // Check if ANY task already exists in ActiveTopology for this volume if clusterInfo.ActiveTopology.HasAnyTask(selectedVolume.VolumeID) { - glog.V(2).Infof("BALANCE: Skipping volume %d, task already exists in ActiveTopology", selectedVolume.VolumeID) - return nil, nil + glog.V(2).Infof("BALANCE [%s]: Skipping volume %d, task already exists in ActiveTopology", diskType, selectedVolume.VolumeID) + return nil } destinationPlan, err := planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume) if err != nil { glog.Warningf("Failed to plan balance destination for volume %d: %v", selectedVolume.VolumeID, err) - return nil, nil // Skip this task if destination planning fails + return nil } // Find the actual disk containing the volume on the source server sourceDisk, found := base.FindVolumeDisk(clusterInfo.ActiveTopology, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) if !found { - return nil, fmt.Errorf("BALANCE: Could not find volume %d (collection: %s) on source server %s - unable to create balance task", - selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) + glog.Warningf("BALANCE [%s]: Could not find volume %d (collection: %s) on source server %s - unable to create balance task", + diskType, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) + return nil } // Create typed parameters with unified source and target information @@ -175,17 +196,18 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI }, }) if err != nil { - return nil, fmt.Errorf("BALANCE: Failed to add pending task for volume %d: %v", selectedVolume.VolumeID, err) + glog.Warningf("BALANCE [%s]: Failed to add pending task for volume %d: %v", diskType, selectedVolume.VolumeID, err) + return nil } glog.V(2).Infof("Added pending balance task %s to ActiveTopology for volume %d: %s:%d -> %s:%d", taskID, selectedVolume.VolumeID, selectedVolume.Server, sourceDisk, destinationPlan.TargetNode, targetDisk) } else { glog.Warningf("No ActiveTopology available for destination planning in balance detection") - return nil, nil + return nil } - return []*types.TaskDetectionResult{task}, nil + return task } // planBalanceDestination plans the destination for a balance operation diff --git a/weed/worker/tasks/balance/detection_test.go b/weed/worker/tasks/balance/detection_test.go new file mode 100644 index 000000000..3fff3720b --- /dev/null +++ b/weed/worker/tasks/balance/detection_test.go @@ -0,0 +1,255 @@ +package balance + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/admin/topology" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +func createMockTopology(volumes ...*types.VolumeHealthMetrics) *topology.ActiveTopology { + at := topology.NewActiveTopology(0) + + // Group volumes by server for easier topology construction + volumesByServer := make(map[string][]*master_pb.VolumeInformationMessage) + for _, v := range volumes { + if _, ok := volumesByServer[v.Server]; !ok { + volumesByServer[v.Server] = []*master_pb.VolumeInformationMessage{} + } + volumesByServer[v.Server] = append(volumesByServer[v.Server], &master_pb.VolumeInformationMessage{ + Id: v.VolumeID, + Size: v.Size, + Collection: v.Collection, + ReplicaPlacement: 0, + Ttl: 0, + Version: 1, + }) + } + + topoInfo := &master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack1", + DataNodeInfos: []*master_pb.DataNodeInfo{ + // SSD Nodes + { + Id: "ssd-server-1", + Address: "ssd-server-1:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "ssd": { + Type: "ssd", + DiskId: 1, + VolumeInfos: volumesByServer["ssd-server-1"], + MaxVolumeCount: 1000, + }, + }, + }, + { + Id: "ssd-server-2", + Address: "ssd-server-2:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "ssd": { + Type: "ssd", + DiskId: 2, + VolumeInfos: volumesByServer["ssd-server-2"], + MaxVolumeCount: 1000, + }, + }, + }, + // HDD Nodes + { + Id: "hdd-server-1", + Address: "hdd-server-1:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd": { + Type: "hdd", + DiskId: 3, // Changed index to avoid conflict + VolumeInfos: volumesByServer["hdd-server-1"], + MaxVolumeCount: 1000, + }, + }, + }, + { + Id: "hdd-server-2", + Address: "hdd-server-2:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd": { + Type: "hdd", + DiskId: 4, + VolumeInfos: volumesByServer["hdd-server-2"], + MaxVolumeCount: 1000, + }, + }, + }, + }, + }, + }, + }, + }, + } + + at.UpdateTopology(topoInfo) + return at +} + +func TestDetection_MixedDiskTypes(t *testing.T) { + // Setup metrics + // 2 SSD servers with 10 volumes each (Balanced) + // 2 HDD servers with 100 volumes each (Balanced) + + metrics := []*types.VolumeHealthMetrics{} + + // SSD Servers + for i := 0; i < 10; i++ { + metrics = append(metrics, &types.VolumeHealthMetrics{ + VolumeID: uint32(i + 1), + Server: "ssd-server-1", + ServerAddress: "ssd-server-1:8080", + DiskType: "ssd", + Collection: "c1", + Size: 1024, + DataCenter: "dc1", + Rack: "rack1", + }) + } + for i := 0; i < 10; i++ { + metrics = append(metrics, &types.VolumeHealthMetrics{ + VolumeID: uint32(20 + i + 1), + Server: "ssd-server-2", + ServerAddress: "ssd-server-2:8080", + DiskType: "ssd", + Collection: "c1", + Size: 1024, + DataCenter: "dc1", + Rack: "rack1", + }) + } + + // HDD Servers + for i := 0; i < 100; i++ { + metrics = append(metrics, &types.VolumeHealthMetrics{ + VolumeID: uint32(100 + i + 1), + Server: "hdd-server-1", + ServerAddress: "hdd-server-1:8080", + DiskType: "hdd", + Collection: "c1", + Size: 1024, + DataCenter: "dc1", + Rack: "rack1", + }) + } + for i := 0; i < 100; i++ { + metrics = append(metrics, &types.VolumeHealthMetrics{ + VolumeID: uint32(200 + i + 1), + Server: "hdd-server-2", + ServerAddress: "hdd-server-2:8080", + DiskType: "hdd", + Collection: "c1", + Size: 1024, + DataCenter: "dc1", + Rack: "rack1", + }) + } + + conf := &Config{ + BaseConfig: base.BaseConfig{ + Enabled: true, + ScanIntervalSeconds: 30, + MaxConcurrent: 1, + }, + MinServerCount: 2, + ImbalanceThreshold: 0.2, // 20% + } + + at := createMockTopology(metrics...) + clusterInfo := &types.ClusterInfo{ + ActiveTopology: at, + } + + tasks, err := Detection(metrics, clusterInfo, conf) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + if len(tasks) != 0 { + t.Errorf("Expected 0 tasks for balanced mixed types, got %d", len(tasks)) + for _, task := range tasks { + t.Logf("Computed Task: %+v", task.Reason) + } + } +} + +func TestDetection_ImbalancedDiskType(t *testing.T) { + // Setup metrics + // 2 SSD servers: One with 100, One with 10. Imbalance! + metrics := []*types.VolumeHealthMetrics{} + + // Server 1 (Overloaded SSD) + for i := 0; i < 100; i++ { + metrics = append(metrics, &types.VolumeHealthMetrics{ + VolumeID: uint32(i + 1), + Server: "ssd-server-1", + ServerAddress: "ssd-server-1:8080", + DiskType: "ssd", + Collection: "c1", + Size: 1024, + DataCenter: "dc1", + Rack: "rack1", + }) + } + // Server 2 (Underloaded SSD) + for i := 0; i < 10; i++ { + metrics = append(metrics, &types.VolumeHealthMetrics{ + VolumeID: uint32(100 + i + 1), + Server: "ssd-server-2", + ServerAddress: "ssd-server-2:8080", + DiskType: "ssd", + Collection: "c1", + Size: 1024, + DataCenter: "dc1", + Rack: "rack1", + }) + } + + conf := &Config{ + BaseConfig: base.BaseConfig{ + Enabled: true, + ScanIntervalSeconds: 30, + MaxConcurrent: 1, + }, + MinServerCount: 2, + ImbalanceThreshold: 0.2, + } + + at := createMockTopology(metrics...) + clusterInfo := &types.ClusterInfo{ + ActiveTopology: at, + } + + tasks, err := Detection(metrics, clusterInfo, conf) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + if len(tasks) == 0 { + t.Error("Expected tasks for imbalanced SSD cluster, got 0") + } else { + // Verify task details + task := tasks[0] + if task.VolumeID == 0 { + t.Error("Task has invalid VolumeID") + } + // Expect volume to be moving from ssd-server-1 to ssd-server-2 + if task.TypedParams.Sources[0].Node != "ssd-server-1:8080" { + t.Errorf("Expected source ssd-server-1:8080, got %s", task.TypedParams.Sources[0].Node) + } + if task.TypedParams.Targets[0].Node != "ssd-server-2:8080" { + t.Errorf("Expected target ssd-server-2:8080, got %s", task.TypedParams.Targets[0].Node) + } + } +}