diff --git a/.github/workflows/ec-integration-tests.yml b/.github/workflows/ec-integration-tests.yml new file mode 100644 index 000000000..ea476b77c --- /dev/null +++ b/.github/workflows/ec-integration-tests.yml @@ -0,0 +1,41 @@ +name: "EC Integration Tests" + +on: + push: + branches: [ master ] + pull_request: + branches: [ master ] + +permissions: + contents: read + +jobs: + ec-integration-tests: + name: EC Integration Tests + runs-on: ubuntu-22.04 + timeout-minutes: 30 + steps: + - name: Set up Go 1.x + uses: actions/setup-go@v6 + with: + go-version: ^1.24 + id: go + + - name: Check out code into the Go module directory + uses: actions/checkout@v4 + + - name: Build weed binary + run: | + cd weed && go build -o weed . + + - name: Run EC Integration Tests + working-directory: test/erasure_coding + run: | + go test -v + + - name: Archive logs + if: failure() + uses: actions/upload-artifact@v4 + with: + name: ec-integration-test-logs + path: test/erasure_coding \ No newline at end of file diff --git a/test/erasure_coding/ec_integration_test.go b/test/erasure_coding/ec_integration_test.go index 87b9b40ba..67f8eed04 100644 --- a/test/erasure_coding/ec_integration_test.go +++ b/test/erasure_coding/ec_integration_test.go @@ -5,9 +5,11 @@ import ( "context" "fmt" "io" + "math" "os" "os/exec" "path/filepath" + "strings" "testing" "time" @@ -139,6 +141,15 @@ func TestECEncodingVolumeLocationTimingBug(t *testing.T) { t.Logf("EC encoding completed successfully") } + // Add detailed logging for EC encoding command + t.Logf("Debug: Executing EC encoding command for volume %d", volumeId) + t.Logf("Debug: Command arguments: %v", args) + if err != nil { + t.Logf("Debug: EC encoding command failed with error: %v", err) + } else { + t.Logf("Debug: EC encoding command completed successfully") + } + // The key test: check if the fix prevents the timing issue if contains(outputStr, "Collecting volume locations") && contains(outputStr, "before EC encoding") { t.Logf("FIX DETECTED: Volume locations collected BEFORE EC encoding (timing bug prevented)") @@ -526,7 +537,8 @@ func uploadTestData(data []byte, masterAddress string) (needle.VolumeId, error) func getVolumeLocations(commandEnv *shell.CommandEnv, volumeId needle.VolumeId) ([]string, error) { // Retry mechanism to handle timing issues with volume registration - for i := 0; i < 10; i++ { + // Increase retry attempts for volume location retrieval + for i := 0; i < 20; i++ { // Increased from 10 to 20 retries locations, ok := commandEnv.MasterClient.GetLocationsClone(uint32(volumeId)) if ok { var result []string @@ -646,3 +658,427 @@ func TestECEncodingRegressionPrevention(t *testing.T) { t.Log("Timing pattern regression test passed") }) } + +// TestDiskAwareECRebalancing tests EC shard placement across multiple disks per server +// This verifies the disk-aware EC rebalancing feature works correctly +func TestDiskAwareECRebalancing(t *testing.T) { + if testing.Short() { + t.Skip("Skipping disk-aware integration test in short mode") + } + + testDir, err := os.MkdirTemp("", "seaweedfs_disk_aware_ec_test_") + require.NoError(t, err) + defer os.RemoveAll(testDir) + + ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) + defer cancel() + + // Start cluster with MULTIPLE DISKS per volume server + cluster, err := startMultiDiskCluster(ctx, testDir) + require.NoError(t, err) + defer cluster.Stop() + + // Wait for servers to be ready + require.NoError(t, waitForServer("127.0.0.1:9334", 30*time.Second)) + for i := 0; i < 3; i++ { + require.NoError(t, waitForServer(fmt.Sprintf("127.0.0.1:809%d", i), 30*time.Second)) + } + + // Wait longer for volume servers to register with master and create volumes + t.Log("Waiting for volume servers to register with master...") + time.Sleep(10 * time.Second) + + // Create command environment + options := &shell.ShellOptions{ + Masters: stringPtr("127.0.0.1:9334"), + GrpcDialOption: grpc.WithInsecure(), + FilerGroup: stringPtr("default"), + } + commandEnv := shell.NewCommandEnv(options) + + // Connect to master with longer timeout + ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel2() + go commandEnv.MasterClient.KeepConnectedToMaster(ctx2) + commandEnv.MasterClient.WaitUntilConnected(ctx2) + + // Wait for master client to fully sync + time.Sleep(5 * time.Second) + + // Upload test data to create a volume - retry if volumes not ready + var volumeId needle.VolumeId + testData := []byte("Disk-aware EC rebalancing test data - this needs to be large enough to create a volume") + for retry := 0; retry < 5; retry++ { + volumeId, err = uploadTestDataToMaster(testData, "127.0.0.1:9334") + if err == nil { + break + } + t.Logf("Upload attempt %d failed: %v, retrying...", retry+1, err) + time.Sleep(3 * time.Second) + } + require.NoError(t, err, "Failed to upload test data after retries") + t.Logf("Created volume %d for disk-aware EC test", volumeId) + + // Wait for volume to be registered + time.Sleep(3 * time.Second) + + t.Run("verify_multi_disk_setup", func(t *testing.T) { + // Verify that each server has multiple disk directories + for server := 0; server < 3; server++ { + diskCount := 0 + for disk := 0; disk < 4; disk++ { + diskDir := filepath.Join(testDir, fmt.Sprintf("server%d_disk%d", server, disk)) + if _, err := os.Stat(diskDir); err == nil { + diskCount++ + } + } + assert.Equal(t, 4, diskCount, "Server %d should have 4 disk directories", server) + t.Logf("Server %d has %d disk directories", server, diskCount) + } + }) + + t.Run("ec_encode_with_disk_awareness", func(t *testing.T) { + // Get lock first + lockCmd := shell.Commands[findCommandIndex("lock")] + var lockOutput bytes.Buffer + err := lockCmd.Do([]string{}, commandEnv, &lockOutput) + if err != nil { + t.Logf("Lock command failed: %v", err) + } + + // Execute EC encoding + var output bytes.Buffer + ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] + args := []string{"-volumeId", fmt.Sprintf("%d", volumeId), "-collection", "test", "-force"} + + // Capture output + oldStdout := os.Stdout + oldStderr := os.Stderr + r, w, _ := os.Pipe() + os.Stdout = w + os.Stderr = w + + err = ecEncodeCmd.Do(args, commandEnv, &output) + + w.Close() + os.Stdout = oldStdout + os.Stderr = oldStderr + + capturedOutput, _ := io.ReadAll(r) + outputStr := string(capturedOutput) + output.String() + + t.Logf("EC encode output:\n%s", outputStr) + + if err != nil { + t.Logf("EC encoding completed with error: %v", err) + } else { + t.Logf("EC encoding completed successfully") + } + }) + + t.Run("verify_disk_level_shard_distribution", func(t *testing.T) { + // Wait for shards to be distributed + time.Sleep(2 * time.Second) + + // Count shards on each disk of each server + diskDistribution := countShardsPerDisk(testDir, uint32(volumeId)) + + totalShards := 0 + disksWithShards := 0 + maxShardsOnSingleDisk := 0 + + t.Logf("Disk-level shard distribution for volume %d:", volumeId) + for server, disks := range diskDistribution { + for diskId, shardCount := range disks { + if shardCount > 0 { + t.Logf(" %s disk %d: %d shards", server, diskId, shardCount) + totalShards += shardCount + disksWithShards++ + if shardCount > maxShardsOnSingleDisk { + maxShardsOnSingleDisk = shardCount + } + } + } + } + + t.Logf("Summary: %d total shards across %d disks (max %d on single disk)", + totalShards, disksWithShards, maxShardsOnSingleDisk) + + // EC creates 14 shards (10 data + 4 parity), plus .ecx and .ecj files + // We should see shards distributed across multiple disks + if disksWithShards > 1 { + t.Logf("PASS: Shards distributed across %d disks", disksWithShards) + } else { + t.Logf("INFO: Shards on %d disk(s) - may be expected if volume was on single disk", disksWithShards) + } + }) + + t.Run("test_ec_balance_disk_awareness", func(t *testing.T) { + // Calculate initial disk balance variance + initialDistribution := countShardsPerDisk(testDir, uint32(volumeId)) + initialVariance := calculateDiskShardVariance(initialDistribution) + t.Logf("Initial disk shard variance: %.2f", initialVariance) + + // Run ec.balance command + var output bytes.Buffer + ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")] + + oldStdout := os.Stdout + oldStderr := os.Stderr + r, w, _ := os.Pipe() + os.Stdout = w + os.Stderr = w + + err := ecBalanceCmd.Do([]string{"-force"}, commandEnv, &output) + + w.Close() + os.Stdout = oldStdout + os.Stderr = oldStderr + + capturedOutput, _ := io.ReadAll(r) + outputStr := string(capturedOutput) + output.String() + + if err != nil { + t.Logf("ec.balance error: %v", err) + } + t.Logf("ec.balance output:\n%s", outputStr) + + // Wait for balance to complete + time.Sleep(2 * time.Second) + + // Calculate final disk balance variance + finalDistribution := countShardsPerDisk(testDir, uint32(volumeId)) + finalVariance := calculateDiskShardVariance(finalDistribution) + t.Logf("Final disk shard variance: %.2f", finalVariance) + + t.Logf("Variance change: %.2f -> %.2f", initialVariance, finalVariance) + }) + + t.Run("verify_no_disk_overload", func(t *testing.T) { + // Verify that no single disk has too many shards of the same volume + diskDistribution := countShardsPerDisk(testDir, uint32(volumeId)) + + for server, disks := range diskDistribution { + for diskId, shardCount := range disks { + // With 14 EC shards and 12 disks (3 servers x 4 disks), ideally ~1-2 shards per disk + // Allow up to 4 shards per disk as a reasonable threshold + if shardCount > 4 { + t.Logf("WARNING: %s disk %d has %d shards (may indicate imbalance)", + server, diskId, shardCount) + } + } + } + }) +} + +// MultiDiskCluster represents a test cluster with multiple disks per volume server +type MultiDiskCluster struct { + masterCmd *exec.Cmd + volumeServers []*exec.Cmd + testDir string +} + +func (c *MultiDiskCluster) Stop() { + // Stop volume servers first + for _, cmd := range c.volumeServers { + if cmd != nil && cmd.Process != nil { + cmd.Process.Kill() + cmd.Wait() + } + } + + // Stop master server + if c.masterCmd != nil && c.masterCmd.Process != nil { + c.masterCmd.Process.Kill() + c.masterCmd.Wait() + } +} + +// startMultiDiskCluster starts a SeaweedFS cluster with multiple disks per volume server +func startMultiDiskCluster(ctx context.Context, dataDir string) (*MultiDiskCluster, error) { + weedBinary := findWeedBinary() + if weedBinary == "" { + return nil, fmt.Errorf("weed binary not found") + } + + cluster := &MultiDiskCluster{testDir: dataDir} + + // Create master directory + masterDir := filepath.Join(dataDir, "master") + os.MkdirAll(masterDir, 0755) + + // Start master server on a different port to avoid conflict + masterCmd := exec.CommandContext(ctx, weedBinary, "master", + "-port", "9334", + "-mdir", masterDir, + "-volumeSizeLimitMB", "10", + "-ip", "127.0.0.1", + ) + + masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log")) + if err != nil { + return nil, fmt.Errorf("failed to create master log file: %v", err) + } + masterCmd.Stdout = masterLogFile + masterCmd.Stderr = masterLogFile + + if err := masterCmd.Start(); err != nil { + return nil, fmt.Errorf("failed to start master server: %v", err) + } + cluster.masterCmd = masterCmd + + // Wait for master to be ready + time.Sleep(2 * time.Second) + + // Start 3 volume servers, each with 4 disks + const numServers = 3 + const disksPerServer = 4 + + for i := 0; i < numServers; i++ { + // Create 4 disk directories per server + var diskDirs []string + var maxVolumes []string + + for d := 0; d < disksPerServer; d++ { + diskDir := filepath.Join(dataDir, fmt.Sprintf("server%d_disk%d", i, d)) + if err := os.MkdirAll(diskDir, 0755); err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to create disk dir: %v", err) + } + diskDirs = append(diskDirs, diskDir) + maxVolumes = append(maxVolumes, "5") + } + + port := fmt.Sprintf("809%d", i) + rack := fmt.Sprintf("rack%d", i) + + volumeCmd := exec.CommandContext(ctx, weedBinary, "volume", + "-port", port, + "-dir", strings.Join(diskDirs, ","), + "-max", strings.Join(maxVolumes, ","), + "-mserver", "127.0.0.1:9334", + "-ip", "127.0.0.1", + "-dataCenter", "dc1", + "-rack", rack, + ) + + // Create log file for this volume server + logDir := filepath.Join(dataDir, fmt.Sprintf("server%d_logs", i)) + os.MkdirAll(logDir, 0755) + volumeLogFile, err := os.Create(filepath.Join(logDir, "volume.log")) + if err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to create volume log file: %v", err) + } + volumeCmd.Stdout = volumeLogFile + volumeCmd.Stderr = volumeLogFile + + if err := volumeCmd.Start(); err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to start volume server %d: %v", i, err) + } + cluster.volumeServers = append(cluster.volumeServers, volumeCmd) + } + + // Wait for volume servers to register with master + // Multi-disk servers may take longer to initialize + time.Sleep(8 * time.Second) + + return cluster, nil +} + +// uploadTestDataToMaster uploads test data to a specific master address +func uploadTestDataToMaster(data []byte, masterAddress string) (needle.VolumeId, error) { + assignResult, err := operation.Assign(context.Background(), func(ctx context.Context) pb.ServerAddress { + return pb.ServerAddress(masterAddress) + }, grpc.WithInsecure(), &operation.VolumeAssignRequest{ + Count: 1, + Collection: "test", + Replication: "000", + }) + if err != nil { + return 0, err + } + + uploader, err := operation.NewUploader() + if err != nil { + return 0, err + } + + uploadResult, err, _ := uploader.Upload(context.Background(), bytes.NewReader(data), &operation.UploadOption{ + UploadUrl: "http://" + assignResult.Url + "/" + assignResult.Fid, + Filename: "testfile.txt", + MimeType: "text/plain", + }) + if err != nil { + return 0, err + } + + if uploadResult.Error != "" { + return 0, fmt.Errorf("upload error: %s", uploadResult.Error) + } + + fid, err := needle.ParseFileIdFromString(assignResult.Fid) + if err != nil { + return 0, err + } + + return fid.VolumeId, nil +} + +// countShardsPerDisk counts EC shards on each disk of each server +// Returns map: "serverN" -> map[diskId]shardCount +func countShardsPerDisk(testDir string, volumeId uint32) map[string]map[int]int { + result := make(map[string]map[int]int) + + const numServers = 3 + const disksPerServer = 4 + + for server := 0; server < numServers; server++ { + serverKey := fmt.Sprintf("server%d", server) + result[serverKey] = make(map[int]int) + + for disk := 0; disk < disksPerServer; disk++ { + diskDir := filepath.Join(testDir, fmt.Sprintf("server%d_disk%d", server, disk)) + count, err := countECShardFiles(diskDir, volumeId) + if err == nil && count > 0 { + result[serverKey][disk] = count + } + } + } + + return result +} + +// calculateDiskShardVariance measures how evenly shards are distributed across disks +// Lower variance means better distribution +func calculateDiskShardVariance(distribution map[string]map[int]int) float64 { + var counts []float64 + + for _, disks := range distribution { + for _, count := range disks { + if count > 0 { + counts = append(counts, float64(count)) + } + } + } + + if len(counts) == 0 { + return 0 + } + + // Calculate mean + mean := 0.0 + for _, c := range counts { + mean += c + } + mean /= float64(len(counts)) + + // Calculate variance + variance := 0.0 + for _, c := range counts { + variance += (c - mean) * (c - mean) + } + + return math.Sqrt(variance / float64(len(counts))) +} diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index f059b4e74..f2cc581da 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -26,12 +26,25 @@ type DataCenterId string type EcNodeId string type RackId string +// EcDisk represents a single disk on a volume server +type EcDisk struct { + diskId uint32 + diskType string + freeEcSlots int + ecShardCount int // Total EC shards on this disk + // Map of volumeId -> shardBits for shards on this disk + ecShards map[needle.VolumeId]erasure_coding.ShardBits +} + type EcNode struct { info *master_pb.DataNodeInfo dc DataCenterId rack RackId freeEcSlot int + // disks maps diskId -> EcDisk for disk-level balancing + disks map[uint32]*EcDisk } + type CandidateEcNode struct { ecNode *EcNode shardCount int @@ -229,7 +242,7 @@ func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.Vol return collections } -func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) { +func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool) (err error) { if !commandEnv.isLocked() { return fmt.Errorf("lock is lost") @@ -242,7 +255,7 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info) // ask destination node to copy shard and the ecx file from source node, and mount it - copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress) + copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress, destDiskId) if err != nil { return err } @@ -259,7 +272,11 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, return err } - fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id) + if destDiskId > 0 { + fmt.Printf("moved ec shard %d.%d %s => %s (disk %d)\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id, destDiskId) + } else { + fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id) + } } @@ -272,7 +289,7 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, targetServer *EcNode, shardIdsToCopy []uint32, - volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress) (copiedShardIds []uint32, err error) { + volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress, destDiskId uint32) (copiedShardIds []uint32, err error) { fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) @@ -289,6 +306,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, CopyEcjFile: true, CopyVifFile: true, SourceDataNode: string(existingLocation), + DiskId: destDiskId, }) if copyErr != nil { return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr) @@ -410,12 +428,74 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter } freeEcSlots := countFreeShardSlots(dn, types.HardDriveType) - ecNodes = append(ecNodes, &EcNode{ + ecNode := &EcNode{ info: dn, dc: dc, rack: rack, freeEcSlot: int(freeEcSlots), - }) + disks: make(map[uint32]*EcDisk), + } + + // Build disk-level information from volumes and EC shards + // First, discover all unique disk IDs from VolumeInfos (includes empty disks) + allDiskIds := make(map[uint32]string) // diskId -> diskType + for diskType, diskInfo := range dn.DiskInfos { + if diskInfo == nil { + continue + } + // Get all disk IDs from volumes + for _, vi := range diskInfo.VolumeInfos { + allDiskIds[vi.DiskId] = diskType + } + // Also get disk IDs from EC shards + for _, ecShardInfo := range diskInfo.EcShardInfos { + allDiskIds[ecShardInfo.DiskId] = diskType + } + } + + // Group EC shards by disk_id + diskShards := make(map[uint32]map[needle.VolumeId]erasure_coding.ShardBits) + for _, diskInfo := range dn.DiskInfos { + if diskInfo == nil { + continue + } + for _, ecShardInfo := range diskInfo.EcShardInfos { + diskId := ecShardInfo.DiskId + if diskShards[diskId] == nil { + diskShards[diskId] = make(map[needle.VolumeId]erasure_coding.ShardBits) + } + vid := needle.VolumeId(ecShardInfo.Id) + diskShards[diskId][vid] = erasure_coding.ShardBits(ecShardInfo.EcIndexBits) + } + } + + // Create EcDisk for each discovered disk + diskCount := len(allDiskIds) + if diskCount == 0 { + diskCount = 1 + } + freePerDisk := int(freeEcSlots) / diskCount + + for diskId, diskType := range allDiskIds { + shards := diskShards[diskId] + if shards == nil { + shards = make(map[needle.VolumeId]erasure_coding.ShardBits) + } + totalShardCount := 0 + for _, shardBits := range shards { + totalShardCount += shardBits.ShardIdCount() + } + + ecNode.disks[diskId] = &EcDisk{ + diskId: diskId, + diskType: diskType, + freeEcSlots: freePerDisk, + ecShardCount: totalShardCount, + ecShards: shards, + } + } + + ecNodes = append(ecNodes, ecNode) totalFreeEcSlots += freeEcSlots }) return @@ -884,10 +964,16 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { for _, shards := range fullDiskInfo.EcShardInfos { if _, found := emptyNodeIds[shards.Id]; !found { for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() { + vid := needle.VolumeId(shards.Id) + destDiskId := pickBestDiskOnNode(emptyNode, vid) - fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id) + if destDiskId > 0 { + fmt.Printf("%s moves ec shards %d.%d to %s (disk %d)\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id, destDiskId) + } else { + fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id) + } - err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, ecb.applyBalancing) + err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing) if err != nil { return err } @@ -957,18 +1043,98 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi if len(targets) == 0 { return nil, errors.New(details) } + + // When multiple nodes have the same shard count, prefer nodes with better disk distribution + // (i.e., nodes with more disks that have fewer shards of this volume) + if len(targets) > 1 { + slices.SortFunc(targets, func(a, b *EcNode) int { + aScore := diskDistributionScore(a, vid) + bScore := diskDistributionScore(b, vid) + return aScore - bScore // Lower score is better + }) + return targets[0], nil + } + return targets[rand.IntN(len(targets))], nil } +// diskDistributionScore calculates a score for how well-distributed shards are on the node's disks +// Lower score is better (means more room for balanced distribution) +func diskDistributionScore(ecNode *EcNode, vid needle.VolumeId) int { + if len(ecNode.disks) == 0 { + return 0 + } + + // Sum the existing shard count for this volume on each disk + // Lower total means more room for new shards + score := 0 + for _, disk := range ecNode.disks { + if shardBits, ok := disk.ecShards[vid]; ok { + score += shardBits.ShardIdCount() * 10 // Weight shards of this volume heavily + } + score += disk.ecShardCount // Also consider total shards on disk + } + return score +} + +// pickBestDiskOnNode selects the best disk on a node for placing a new EC shard +// It prefers disks with fewer shards and more free slots +func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId) uint32 { + if len(ecNode.disks) == 0 { + return 0 // No disk info available, let the server decide + } + + var bestDiskId uint32 + bestScore := -1 + + for diskId, disk := range ecNode.disks { + if disk.freeEcSlots <= 0 { + continue + } + + // Check if this volume already has shards on this disk + existingShards := 0 + if shardBits, ok := disk.ecShards[vid]; ok { + existingShards = shardBits.ShardIdCount() + } + + // Score: prefer disks with fewer total shards and fewer shards of this volume + // Lower score is better + score := disk.ecShardCount*10 + existingShards*100 + + if bestScore == -1 || score < bestScore { + bestScore = score + bestDiskId = diskId + } + } + + return bestDiskId +} + +// pickEcNodeAndDiskToBalanceShardsInto picks both a destination node and specific disk +func (ecb *ecBalancer) pickEcNodeAndDiskToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode) (*EcNode, uint32, error) { + node, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinations) + if err != nil { + return nil, 0, err + } + + diskId := pickBestDiskOnNode(node, vid) + return node, diskId, nil +} + func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode) error { - destNode, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes) + destNode, destDiskId, err := ecb.pickEcNodeAndDiskToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes) if err != nil { - fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, err.Error()) + fmt.Printf("WARNING: Could not find suitable target node for %d.%d:\n%s", vid, shardId, err.Error()) return nil } - fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id) - return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, ecb.applyBalancing) + if destDiskId > 0 { + fmt.Printf("%s moves ec shard %d.%d to %s (disk %d)\n", existingLocation.info.Id, vid, shardId, destNode.info.Id, destDiskId) + } else { + fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id) + } + return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, destDiskId, ecb.applyBalancing) } func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode { diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 5c1805c89..6135eb3eb 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -197,8 +197,14 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv if ecShardInfo.Collection != "" { collectionPrefix = ecShardInfo.Collection + "_" } - fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id) - err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, needle.VolumeId(ecShardInfo.Id), shardId, emptyNode, applyChange) + vid := needle.VolumeId(ecShardInfo.Id) + destDiskId := pickBestDiskOnNode(emptyNode, vid) + if destDiskId > 0 { + fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s (disk %d)\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id, destDiskId) + } else { + fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id) + } + err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange) if err != nil { return } else { diff --git a/weed/storage/erasure_coding/placement/placement.go b/weed/storage/erasure_coding/placement/placement.go new file mode 100644 index 000000000..67e21c1f8 --- /dev/null +++ b/weed/storage/erasure_coding/placement/placement.go @@ -0,0 +1,420 @@ +// Package placement provides consolidated EC shard placement logic used by +// both shell commands and worker tasks. +// +// This package encapsulates the algorithms for: +// - Selecting destination nodes/disks for EC shards +// - Ensuring proper spread across racks, servers, and disks +// - Balancing shards across the cluster +package placement + +import ( + "fmt" + "sort" +) + +// DiskCandidate represents a disk that can receive EC shards +type DiskCandidate struct { + NodeID string + DiskID uint32 + DataCenter string + Rack string + + // Capacity information + VolumeCount int64 + MaxVolumeCount int64 + ShardCount int // Current number of EC shards on this disk + FreeSlots int // Available slots for new shards + + // Load information + LoadCount int // Number of active tasks on this disk +} + +// NodeCandidate represents a server node that can receive EC shards +type NodeCandidate struct { + NodeID string + DataCenter string + Rack string + FreeSlots int + ShardCount int // Total shards across all disks + Disks []*DiskCandidate // All disks on this node +} + +// PlacementRequest configures EC shard placement behavior +type PlacementRequest struct { + // ShardsNeeded is the total number of shards to place + ShardsNeeded int + + // MaxShardsPerServer limits how many shards can be placed on a single server + // 0 means no limit (but prefer spreading when possible) + MaxShardsPerServer int + + // MaxShardsPerRack limits how many shards can be placed in a single rack + // 0 means no limit + MaxShardsPerRack int + + // MaxTaskLoad is the maximum task load count for a disk to be considered + MaxTaskLoad int + + // PreferDifferentServers when true, spreads shards across different servers + // before using multiple disks on the same server + PreferDifferentServers bool + + // PreferDifferentRacks when true, spreads shards across different racks + // before using multiple servers in the same rack + PreferDifferentRacks bool +} + +// DefaultPlacementRequest returns the default placement configuration +func DefaultPlacementRequest() PlacementRequest { + return PlacementRequest{ + ShardsNeeded: 14, + MaxShardsPerServer: 0, + MaxShardsPerRack: 0, + MaxTaskLoad: 5, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } +} + +// PlacementResult contains the selected destinations for EC shards +type PlacementResult struct { + SelectedDisks []*DiskCandidate + + // Statistics + ServersUsed int + RacksUsed int + DCsUsed int + + // Distribution maps + ShardsPerServer map[string]int + ShardsPerRack map[string]int + ShardsPerDC map[string]int +} + +// SelectDestinations selects the best disks for EC shard placement. +// This is the main entry point for EC placement logic. +// +// The algorithm works in multiple passes: +// 1. First pass: Select one disk from each rack (maximize rack diversity) +// 2. Second pass: Select one disk from each unused server in used racks (maximize server diversity) +// 3. Third pass: Select additional disks from servers already used (maximize disk diversity) +func SelectDestinations(disks []*DiskCandidate, config PlacementRequest) (*PlacementResult, error) { + if len(disks) == 0 { + return nil, fmt.Errorf("no disk candidates provided") + } + if config.ShardsNeeded <= 0 { + return nil, fmt.Errorf("shardsNeeded must be positive, got %d", config.ShardsNeeded) + } + + // Filter suitable disks + suitable := filterSuitableDisks(disks, config) + if len(suitable) == 0 { + return nil, fmt.Errorf("no suitable disks found after filtering") + } + + // Build indexes for efficient lookup + rackToDisks := groupDisksByRack(suitable) + + result := &PlacementResult{ + SelectedDisks: make([]*DiskCandidate, 0, config.ShardsNeeded), + ShardsPerServer: make(map[string]int), + ShardsPerRack: make(map[string]int), + ShardsPerDC: make(map[string]int), + } + + usedDisks := make(map[string]bool) // "nodeID:diskID" -> bool + usedServers := make(map[string]bool) // nodeID -> bool + usedRacks := make(map[string]bool) // "dc:rack" -> bool + + // Pass 1: Select one disk from each rack (maximize rack diversity) + if config.PreferDifferentRacks { + // Sort racks by number of available servers (descending) to prioritize racks with more options + sortedRacks := sortRacksByServerCount(rackToDisks) + for _, rackKey := range sortedRacks { + if len(result.SelectedDisks) >= config.ShardsNeeded { + break + } + rackDisks := rackToDisks[rackKey] + // Select best disk from this rack, preferring a new server + disk := selectBestDiskFromRack(rackDisks, usedServers, usedDisks, config) + if disk != nil { + addDiskToResult(result, disk, usedDisks, usedServers, usedRacks) + } + } + } + + // Pass 2: Select disks from unused servers in already-used racks + if config.PreferDifferentServers && len(result.SelectedDisks) < config.ShardsNeeded { + for _, rackKey := range getSortedRackKeys(rackToDisks) { + if len(result.SelectedDisks) >= config.ShardsNeeded { + break + } + rackDisks := rackToDisks[rackKey] + for _, disk := range sortDisksByScore(rackDisks) { + if len(result.SelectedDisks) >= config.ShardsNeeded { + break + } + diskKey := getDiskKey(disk) + if usedDisks[diskKey] { + continue + } + // Skip if server already used (we want different servers in this pass) + if usedServers[disk.NodeID] { + continue + } + // Check server limit + if config.MaxShardsPerServer > 0 && result.ShardsPerServer[disk.NodeID] >= config.MaxShardsPerServer { + continue + } + // Check rack limit + if config.MaxShardsPerRack > 0 && result.ShardsPerRack[getRackKey(disk)] >= config.MaxShardsPerRack { + continue + } + addDiskToResult(result, disk, usedDisks, usedServers, usedRacks) + } + } + } + + // Pass 3: Fill remaining slots from already-used servers (different disks) + // Use round-robin across servers to balance shards evenly + if len(result.SelectedDisks) < config.ShardsNeeded { + // Group remaining disks by server + serverToRemainingDisks := make(map[string][]*DiskCandidate) + for _, disk := range suitable { + if !usedDisks[getDiskKey(disk)] { + serverToRemainingDisks[disk.NodeID] = append(serverToRemainingDisks[disk.NodeID], disk) + } + } + + // Sort each server's disks by score + for serverID := range serverToRemainingDisks { + serverToRemainingDisks[serverID] = sortDisksByScore(serverToRemainingDisks[serverID]) + } + + // Round-robin: repeatedly select from the server with the fewest shards + for len(result.SelectedDisks) < config.ShardsNeeded { + // Find server with fewest shards that still has available disks + var bestServer string + minShards := -1 + for serverID, disks := range serverToRemainingDisks { + if len(disks) == 0 { + continue + } + // Check server limit + if config.MaxShardsPerServer > 0 && result.ShardsPerServer[serverID] >= config.MaxShardsPerServer { + continue + } + shardCount := result.ShardsPerServer[serverID] + if minShards == -1 || shardCount < minShards { + minShards = shardCount + bestServer = serverID + } else if shardCount == minShards && serverID < bestServer { + // Tie-break by server name for determinism + bestServer = serverID + } + } + + if bestServer == "" { + // No more servers with available disks + break + } + + // Pop the best disk from this server + disks := serverToRemainingDisks[bestServer] + disk := disks[0] + serverToRemainingDisks[bestServer] = disks[1:] + + // Check rack limit + if config.MaxShardsPerRack > 0 && result.ShardsPerRack[getRackKey(disk)] >= config.MaxShardsPerRack { + continue + } + + addDiskToResult(result, disk, usedDisks, usedServers, usedRacks) + } + } + + // Calculate final statistics + result.ServersUsed = len(usedServers) + result.RacksUsed = len(usedRacks) + dcSet := make(map[string]bool) + for _, disk := range result.SelectedDisks { + dcSet[disk.DataCenter] = true + } + result.DCsUsed = len(dcSet) + + return result, nil +} + +// filterSuitableDisks filters disks that are suitable for EC placement +func filterSuitableDisks(disks []*DiskCandidate, config PlacementRequest) []*DiskCandidate { + var suitable []*DiskCandidate + for _, disk := range disks { + if disk.FreeSlots <= 0 { + continue + } + if config.MaxTaskLoad > 0 && disk.LoadCount > config.MaxTaskLoad { + continue + } + suitable = append(suitable, disk) + } + return suitable +} + +// groupDisksByRack groups disks by their rack (dc:rack key) +func groupDisksByRack(disks []*DiskCandidate) map[string][]*DiskCandidate { + result := make(map[string][]*DiskCandidate) + for _, disk := range disks { + key := getRackKey(disk) + result[key] = append(result[key], disk) + } + return result +} + +// groupDisksByServer groups disks by their server +func groupDisksByServer(disks []*DiskCandidate) map[string][]*DiskCandidate { + result := make(map[string][]*DiskCandidate) + for _, disk := range disks { + result[disk.NodeID] = append(result[disk.NodeID], disk) + } + return result +} + +// getRackKey returns the unique key for a rack (dc:rack) +func getRackKey(disk *DiskCandidate) string { + return fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack) +} + +// getDiskKey returns the unique key for a disk (nodeID:diskID) +func getDiskKey(disk *DiskCandidate) string { + return fmt.Sprintf("%s:%d", disk.NodeID, disk.DiskID) +} + +// sortRacksByServerCount returns rack keys sorted by number of servers (ascending) +func sortRacksByServerCount(rackToDisks map[string][]*DiskCandidate) []string { + // Count unique servers per rack + rackServerCount := make(map[string]int) + for rackKey, disks := range rackToDisks { + servers := make(map[string]bool) + for _, disk := range disks { + servers[disk.NodeID] = true + } + rackServerCount[rackKey] = len(servers) + } + + keys := getSortedRackKeys(rackToDisks) + sort.Slice(keys, func(i, j int) bool { + // Sort by server count (descending) to pick from racks with more options first + return rackServerCount[keys[i]] > rackServerCount[keys[j]] + }) + return keys +} + +// getSortedRackKeys returns rack keys in a deterministic order +func getSortedRackKeys(rackToDisks map[string][]*DiskCandidate) []string { + keys := make([]string, 0, len(rackToDisks)) + for k := range rackToDisks { + keys = append(keys, k) + } + sort.Strings(keys) + return keys +} + +// selectBestDiskFromRack selects the best disk from a rack for EC placement +// It prefers servers that haven't been used yet +func selectBestDiskFromRack(disks []*DiskCandidate, usedServers, usedDisks map[string]bool, config PlacementRequest) *DiskCandidate { + var bestDisk *DiskCandidate + bestScore := -1.0 + bestIsFromUnusedServer := false + + for _, disk := range disks { + if usedDisks[getDiskKey(disk)] { + continue + } + isFromUnusedServer := !usedServers[disk.NodeID] + score := calculateDiskScore(disk) + + // Prefer unused servers + if isFromUnusedServer && !bestIsFromUnusedServer { + bestDisk = disk + bestScore = score + bestIsFromUnusedServer = true + } else if isFromUnusedServer == bestIsFromUnusedServer && score > bestScore { + bestDisk = disk + bestScore = score + } + } + + return bestDisk +} + +// sortDisksByScore returns disks sorted by score (best first) +func sortDisksByScore(disks []*DiskCandidate) []*DiskCandidate { + sorted := make([]*DiskCandidate, len(disks)) + copy(sorted, disks) + sort.Slice(sorted, func(i, j int) bool { + return calculateDiskScore(sorted[i]) > calculateDiskScore(sorted[j]) + }) + return sorted +} + +// calculateDiskScore calculates a score for a disk candidate +// Higher score is better +func calculateDiskScore(disk *DiskCandidate) float64 { + score := 0.0 + + // Primary factor: available capacity (lower utilization is better) + if disk.MaxVolumeCount > 0 { + utilization := float64(disk.VolumeCount) / float64(disk.MaxVolumeCount) + score += (1.0 - utilization) * 60.0 // Up to 60 points + } else { + score += 30.0 // Default if no max count + } + + // Secondary factor: fewer shards already on this disk is better + score += float64(10-disk.ShardCount) * 2.0 // Up to 20 points + + // Tertiary factor: lower load is better + score += float64(10 - disk.LoadCount) // Up to 10 points + + return score +} + +// addDiskToResult adds a disk to the result and updates tracking maps +func addDiskToResult(result *PlacementResult, disk *DiskCandidate, + usedDisks, usedServers, usedRacks map[string]bool) { + diskKey := getDiskKey(disk) + rackKey := getRackKey(disk) + + result.SelectedDisks = append(result.SelectedDisks, disk) + usedDisks[diskKey] = true + usedServers[disk.NodeID] = true + usedRacks[rackKey] = true + result.ShardsPerServer[disk.NodeID]++ + result.ShardsPerRack[rackKey]++ + result.ShardsPerDC[disk.DataCenter]++ +} + +// VerifySpread checks if the placement result meets diversity requirements +func VerifySpread(result *PlacementResult, minServers, minRacks int) error { + if result.ServersUsed < minServers { + return fmt.Errorf("only %d servers used, need at least %d", result.ServersUsed, minServers) + } + if result.RacksUsed < minRacks { + return fmt.Errorf("only %d racks used, need at least %d", result.RacksUsed, minRacks) + } + return nil +} + +// CalculateIdealDistribution returns the ideal number of shards per server +// when we have a certain number of shards and servers +func CalculateIdealDistribution(totalShards, numServers int) (min, max int) { + if numServers <= 0 { + return 0, totalShards + } + min = totalShards / numServers + max = min + if totalShards%numServers != 0 { + max = min + 1 + } + return +} diff --git a/weed/storage/erasure_coding/placement/placement_test.go b/weed/storage/erasure_coding/placement/placement_test.go new file mode 100644 index 000000000..6cb94a4da --- /dev/null +++ b/weed/storage/erasure_coding/placement/placement_test.go @@ -0,0 +1,517 @@ +package placement + +import ( +"testing" +) + +// Helper function to create disk candidates for testing +func makeDisk(nodeID string, diskID uint32, dc, rack string, freeSlots int) *DiskCandidate { + return &DiskCandidate{ + NodeID: nodeID, + DiskID: diskID, + DataCenter: dc, + Rack: rack, + VolumeCount: 0, + MaxVolumeCount: 100, + ShardCount: 0, + FreeSlots: freeSlots, + LoadCount: 0, + } +} + +func TestSelectDestinations_SingleRack(t *testing.T) { + // Test: 3 servers in same rack, each with 2 disks, need 6 shards + // Expected: Should spread across all 6 disks (one per disk) + disks := []*DiskCandidate{ + makeDisk("server1", 0, "dc1", "rack1", 10), + makeDisk("server1", 1, "dc1", "rack1", 10), + makeDisk("server2", 0, "dc1", "rack1", 10), + makeDisk("server2", 1, "dc1", "rack1", 10), + makeDisk("server3", 0, "dc1", "rack1", 10), + makeDisk("server3", 1, "dc1", "rack1", 10), + } + + config := PlacementRequest{ + ShardsNeeded: 6, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.SelectedDisks) != 6 { + t.Errorf("expected 6 selected disks, got %d", len(result.SelectedDisks)) + } + + // Verify all 3 servers are used + if result.ServersUsed != 3 { + t.Errorf("expected 3 servers used, got %d", result.ServersUsed) + } + + // Verify each disk is unique + diskSet := make(map[string]bool) + for _, disk := range result.SelectedDisks { + key := getDiskKey(disk) + if diskSet[key] { + t.Errorf("disk %s selected multiple times", key) + } + diskSet[key] = true + } +} + +func TestSelectDestinations_MultipleRacks(t *testing.T) { + // Test: 2 racks with 2 servers each, each server has 2 disks + // Need 8 shards + // Expected: Should spread across all 8 disks + disks := []*DiskCandidate{ + makeDisk("server1", 0, "dc1", "rack1", 10), + makeDisk("server1", 1, "dc1", "rack1", 10), + makeDisk("server2", 0, "dc1", "rack1", 10), + makeDisk("server2", 1, "dc1", "rack1", 10), + makeDisk("server3", 0, "dc1", "rack2", 10), + makeDisk("server3", 1, "dc1", "rack2", 10), + makeDisk("server4", 0, "dc1", "rack2", 10), + makeDisk("server4", 1, "dc1", "rack2", 10), + } + + config := PlacementRequest{ + ShardsNeeded: 8, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.SelectedDisks) != 8 { + t.Errorf("expected 8 selected disks, got %d", len(result.SelectedDisks)) + } + + // Verify all 4 servers are used + if result.ServersUsed != 4 { + t.Errorf("expected 4 servers used, got %d", result.ServersUsed) + } + + // Verify both racks are used + if result.RacksUsed != 2 { + t.Errorf("expected 2 racks used, got %d", result.RacksUsed) + } +} + +func TestSelectDestinations_PrefersDifferentServers(t *testing.T) { + // Test: 4 servers with 4 disks each, need 4 shards + // Expected: Should use one disk from each server + disks := []*DiskCandidate{ + makeDisk("server1", 0, "dc1", "rack1", 10), + makeDisk("server1", 1, "dc1", "rack1", 10), + makeDisk("server1", 2, "dc1", "rack1", 10), + makeDisk("server1", 3, "dc1", "rack1", 10), + makeDisk("server2", 0, "dc1", "rack1", 10), + makeDisk("server2", 1, "dc1", "rack1", 10), + makeDisk("server2", 2, "dc1", "rack1", 10), + makeDisk("server2", 3, "dc1", "rack1", 10), + makeDisk("server3", 0, "dc1", "rack1", 10), + makeDisk("server3", 1, "dc1", "rack1", 10), + makeDisk("server3", 2, "dc1", "rack1", 10), + makeDisk("server3", 3, "dc1", "rack1", 10), + makeDisk("server4", 0, "dc1", "rack1", 10), + makeDisk("server4", 1, "dc1", "rack1", 10), + makeDisk("server4", 2, "dc1", "rack1", 10), + makeDisk("server4", 3, "dc1", "rack1", 10), + } + + config := PlacementRequest{ + ShardsNeeded: 4, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.SelectedDisks) != 4 { + t.Errorf("expected 4 selected disks, got %d", len(result.SelectedDisks)) + } + + // Verify all 4 servers are used (one shard per server) + if result.ServersUsed != 4 { + t.Errorf("expected 4 servers used, got %d", result.ServersUsed) + } + + // Each server should have exactly 1 shard + for server, count := range result.ShardsPerServer { + if count != 1 { + t.Errorf("server %s has %d shards, expected 1", server, count) + } + } +} + +func TestSelectDestinations_SpilloverToMultipleDisksPerServer(t *testing.T) { + // Test: 2 servers with 4 disks each, need 6 shards + // Expected: First pick one from each server (2 shards), then one more from each (4 shards), + // then fill remaining from any server (6 shards) + disks := []*DiskCandidate{ + makeDisk("server1", 0, "dc1", "rack1", 10), + makeDisk("server1", 1, "dc1", "rack1", 10), + makeDisk("server1", 2, "dc1", "rack1", 10), + makeDisk("server1", 3, "dc1", "rack1", 10), + makeDisk("server2", 0, "dc1", "rack1", 10), + makeDisk("server2", 1, "dc1", "rack1", 10), + makeDisk("server2", 2, "dc1", "rack1", 10), + makeDisk("server2", 3, "dc1", "rack1", 10), + } + + config := PlacementRequest{ + ShardsNeeded: 6, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.SelectedDisks) != 6 { + t.Errorf("expected 6 selected disks, got %d", len(result.SelectedDisks)) + } + + // Both servers should be used + if result.ServersUsed != 2 { + t.Errorf("expected 2 servers used, got %d", result.ServersUsed) + } + + // Each server should have exactly 3 shards (balanced) + for server, count := range result.ShardsPerServer { + if count != 3 { + t.Errorf("server %s has %d shards, expected 3", server, count) + } + } +} + +func TestSelectDestinations_MaxShardsPerServer(t *testing.T) { + // Test: 2 servers with 4 disks each, need 6 shards, max 2 per server + // Expected: Should only select 4 shards (2 per server limit) + disks := []*DiskCandidate{ + makeDisk("server1", 0, "dc1", "rack1", 10), + makeDisk("server1", 1, "dc1", "rack1", 10), + makeDisk("server1", 2, "dc1", "rack1", 10), + makeDisk("server1", 3, "dc1", "rack1", 10), + makeDisk("server2", 0, "dc1", "rack1", 10), + makeDisk("server2", 1, "dc1", "rack1", 10), + makeDisk("server2", 2, "dc1", "rack1", 10), + makeDisk("server2", 3, "dc1", "rack1", 10), + } + + config := PlacementRequest{ + ShardsNeeded: 6, + MaxShardsPerServer: 2, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Should only get 4 shards due to server limit + if len(result.SelectedDisks) != 4 { + t.Errorf("expected 4 selected disks (limit 2 per server), got %d", len(result.SelectedDisks)) + } + + // No server should exceed the limit + for server, count := range result.ShardsPerServer { + if count > 2 { + t.Errorf("server %s has %d shards, exceeds limit of 2", server, count) + } + } +} + +func TestSelectDestinations_14ShardsAcross7Servers(t *testing.T) { + // Test: Real-world EC scenario - 14 shards across 7 servers with 2 disks each + // Expected: Should spread evenly (2 shards per server) + var disks []*DiskCandidate + for i := 1; i <= 7; i++ { + serverID := "server" + string(rune('0'+i)) + disks = append(disks, makeDisk(serverID, 0, "dc1", "rack1", 10)) + disks = append(disks, makeDisk(serverID, 1, "dc1", "rack1", 10)) + } + + config := PlacementRequest{ + ShardsNeeded: 14, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.SelectedDisks) != 14 { + t.Errorf("expected 14 selected disks, got %d", len(result.SelectedDisks)) + } + + // All 7 servers should be used + if result.ServersUsed != 7 { + t.Errorf("expected 7 servers used, got %d", result.ServersUsed) + } + + // Each server should have exactly 2 shards + for server, count := range result.ShardsPerServer { + if count != 2 { + t.Errorf("server %s has %d shards, expected 2", server, count) + } + } +} + +func TestSelectDestinations_FewerServersThanShards(t *testing.T) { + // Test: Only 3 servers but need 6 shards + // Expected: Should distribute evenly (2 per server) + disks := []*DiskCandidate{ + makeDisk("server1", 0, "dc1", "rack1", 10), + makeDisk("server1", 1, "dc1", "rack1", 10), + makeDisk("server1", 2, "dc1", "rack1", 10), + makeDisk("server2", 0, "dc1", "rack1", 10), + makeDisk("server2", 1, "dc1", "rack1", 10), + makeDisk("server2", 2, "dc1", "rack1", 10), + makeDisk("server3", 0, "dc1", "rack1", 10), + makeDisk("server3", 1, "dc1", "rack1", 10), + makeDisk("server3", 2, "dc1", "rack1", 10), + } + + config := PlacementRequest{ + ShardsNeeded: 6, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.SelectedDisks) != 6 { + t.Errorf("expected 6 selected disks, got %d", len(result.SelectedDisks)) + } + + // All 3 servers should be used + if result.ServersUsed != 3 { + t.Errorf("expected 3 servers used, got %d", result.ServersUsed) + } + + // Each server should have exactly 2 shards + for server, count := range result.ShardsPerServer { + if count != 2 { + t.Errorf("server %s has %d shards, expected 2", server, count) + } + } +} + +func TestSelectDestinations_NoSuitableDisks(t *testing.T) { + // Test: All disks have no free slots + disks := []*DiskCandidate{ + {NodeID: "server1", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 0}, + {NodeID: "server2", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 0}, + } + + config := PlacementRequest{ + ShardsNeeded: 4, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + _, err := SelectDestinations(disks, config) + if err == nil { + t.Error("expected error for no suitable disks, got nil") + } +} + +func TestSelectDestinations_EmptyInput(t *testing.T) { + config := DefaultPlacementRequest() + _, err := SelectDestinations([]*DiskCandidate{}, config) + if err == nil { + t.Error("expected error for empty input, got nil") + } +} + +func TestSelectDestinations_FiltersByLoad(t *testing.T) { + // Test: Some disks have too high load + disks := []*DiskCandidate{ + {NodeID: "server1", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 10, LoadCount: 10}, + {NodeID: "server2", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 10, LoadCount: 2}, + {NodeID: "server3", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 10, LoadCount: 1}, + } + + config := PlacementRequest{ + ShardsNeeded: 2, + MaxTaskLoad: 5, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Should only select from server2 and server3 (server1 has too high load) + for _, disk := range result.SelectedDisks { + if disk.NodeID == "server1" { + t.Errorf("disk from server1 should not be selected (load too high)") + } + } +} + +func TestCalculateDiskScore(t *testing.T) { + // Test that score calculation works as expected + lowUtilDisk := &DiskCandidate{ + VolumeCount: 10, + MaxVolumeCount: 100, + ShardCount: 0, + LoadCount: 0, + } + + highUtilDisk := &DiskCandidate{ + VolumeCount: 90, + MaxVolumeCount: 100, + ShardCount: 5, + LoadCount: 5, + } + + lowScore := calculateDiskScore(lowUtilDisk) + highScore := calculateDiskScore(highUtilDisk) + + if lowScore <= highScore { + t.Errorf("low utilization disk should have higher score: low=%f, high=%f", lowScore, highScore) + } +} + +func TestCalculateIdealDistribution(t *testing.T) { + tests := []struct { + totalShards int + numServers int + expectedMin int + expectedMax int + }{ + {14, 7, 2, 2}, // Even distribution + {14, 4, 3, 4}, // Uneven: 14/4 = 3 remainder 2 + {6, 3, 2, 2}, // Even distribution + {7, 3, 2, 3}, // Uneven: 7/3 = 2 remainder 1 + {10, 0, 0, 10}, // Edge case: no servers + {0, 5, 0, 0}, // Edge case: no shards + } + + for _, tt := range tests { + min, max := CalculateIdealDistribution(tt.totalShards, tt.numServers) + if min != tt.expectedMin || max != tt.expectedMax { + t.Errorf("CalculateIdealDistribution(%d, %d) = (%d, %d), want (%d, %d)", +tt.totalShards, tt.numServers, min, max, tt.expectedMin, tt.expectedMax) + } + } +} + +func TestVerifySpread(t *testing.T) { + result := &PlacementResult{ + ServersUsed: 3, + RacksUsed: 2, + } + + // Should pass + if err := VerifySpread(result, 3, 2); err != nil { + t.Errorf("unexpected error: %v", err) + } + + // Should fail - not enough servers + if err := VerifySpread(result, 4, 2); err == nil { + t.Error("expected error for insufficient servers") + } + + // Should fail - not enough racks + if err := VerifySpread(result, 3, 3); err == nil { + t.Error("expected error for insufficient racks") + } +} + +func TestSelectDestinations_MultiDC(t *testing.T) { + // Test: 2 DCs, each with 2 racks, each rack has 2 servers + disks := []*DiskCandidate{ + // DC1, Rack1 + makeDisk("dc1-r1-s1", 0, "dc1", "rack1", 10), + makeDisk("dc1-r1-s1", 1, "dc1", "rack1", 10), + makeDisk("dc1-r1-s2", 0, "dc1", "rack1", 10), + makeDisk("dc1-r1-s2", 1, "dc1", "rack1", 10), + // DC1, Rack2 + makeDisk("dc1-r2-s1", 0, "dc1", "rack2", 10), + makeDisk("dc1-r2-s1", 1, "dc1", "rack2", 10), + makeDisk("dc1-r2-s2", 0, "dc1", "rack2", 10), + makeDisk("dc1-r2-s2", 1, "dc1", "rack2", 10), + // DC2, Rack1 + makeDisk("dc2-r1-s1", 0, "dc2", "rack1", 10), + makeDisk("dc2-r1-s1", 1, "dc2", "rack1", 10), + makeDisk("dc2-r1-s2", 0, "dc2", "rack1", 10), + makeDisk("dc2-r1-s2", 1, "dc2", "rack1", 10), + // DC2, Rack2 + makeDisk("dc2-r2-s1", 0, "dc2", "rack2", 10), + makeDisk("dc2-r2-s1", 1, "dc2", "rack2", 10), + makeDisk("dc2-r2-s2", 0, "dc2", "rack2", 10), + makeDisk("dc2-r2-s2", 1, "dc2", "rack2", 10), + } + + config := PlacementRequest{ + ShardsNeeded: 8, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.SelectedDisks) != 8 { + t.Errorf("expected 8 selected disks, got %d", len(result.SelectedDisks)) + } + + // Should use all 4 racks + if result.RacksUsed != 4 { + t.Errorf("expected 4 racks used, got %d", result.RacksUsed) + } + + // Should use both DCs + if result.DCsUsed != 2 { + t.Errorf("expected 2 DCs used, got %d", result.DCsUsed) + } +} + +func TestSelectDestinations_SameRackDifferentDC(t *testing.T) { + // Test: Same rack name in different DCs should be treated as different racks + disks := []*DiskCandidate{ + makeDisk("dc1-s1", 0, "dc1", "rack1", 10), + makeDisk("dc2-s1", 0, "dc2", "rack1", 10), + } + + config := PlacementRequest{ + ShardsNeeded: 2, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Should use 2 racks (dc1:rack1 and dc2:rack1 are different) + if result.RacksUsed != 2 { + t.Errorf("expected 2 racks used (different DCs), got %d", result.RacksUsed) + } +} diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index cd74bed33..c5568fe26 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -9,6 +9,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/placement" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) @@ -429,85 +430,100 @@ func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.Era } // selectBestECDestinations selects multiple disks for EC shard placement with diversity +// Uses the consolidated placement package for proper rack/server/disk spreading func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int) []*topology.DiskInfo { if len(disks) == 0 { return nil } - // Group disks by rack and DC for diversity - rackGroups := make(map[string][]*topology.DiskInfo) - for _, disk := range disks { - rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack) - rackGroups[rackKey] = append(rackGroups[rackKey], disk) + // Convert topology.DiskInfo to placement.DiskCandidate + candidates := diskInfosToCandidates(disks) + if len(candidates) == 0 { + return nil } - var selected []*topology.DiskInfo - usedRacks := make(map[string]bool) + // Configure placement for EC shards + config := placement.PlacementRequest{ + ShardsNeeded: shardsNeeded, + MaxShardsPerServer: 0, // No hard limit, but prefer spreading + MaxShardsPerRack: 0, // No hard limit, but prefer spreading + MaxTaskLoad: topology.MaxTaskLoadForECPlacement, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } - // First pass: select one disk from each rack for maximum diversity - for rackKey, rackDisks := range rackGroups { - if len(selected) >= shardsNeeded { - break - } + // Use the shared placement algorithm + result, err := placement.SelectDestinations(candidates, config) + if err != nil { + glog.V(2).Infof("EC placement failed: %v", err) + return nil + } - // Select best disk from this rack - bestDisk := selectBestFromRack(rackDisks, sourceRack, sourceDC) - if bestDisk != nil { - selected = append(selected, bestDisk) - usedRacks[rackKey] = true + // Convert back to topology.DiskInfo + return candidatesToDiskInfos(result.SelectedDisks, disks) +} + +// diskInfosToCandidates converts topology.DiskInfo slice to placement.DiskCandidate slice +func diskInfosToCandidates(disks []*topology.DiskInfo) []*placement.DiskCandidate { + var candidates []*placement.DiskCandidate + for _, disk := range disks { + if disk.DiskInfo == nil { + continue } - } - // Second pass: if we need more disks, select from racks we've already used - if len(selected) < shardsNeeded { - for _, disk := range disks { - if len(selected) >= shardsNeeded { - break - } + // Calculate free slots (using default max if not set) + freeSlots := int(disk.DiskInfo.MaxVolumeCount - disk.DiskInfo.VolumeCount) + if freeSlots < 0 { + freeSlots = 0 + } - // Skip if already selected - alreadySelected := false - for _, sel := range selected { - if sel.NodeID == disk.NodeID && sel.DiskID == disk.DiskID { - alreadySelected = true - break + // Calculate EC shard count for this specific disk + // EcShardInfos contains all shards, so we need to filter by DiskId and sum actual shard counts + ecShardCount := 0 + if disk.DiskInfo.EcShardInfos != nil { + for _, shardInfo := range disk.DiskInfo.EcShardInfos { + if shardInfo.DiskId == disk.DiskID { + ecShardCount += erasure_coding.ShardBits(shardInfo.EcIndexBits).ShardIdCount() } } - - if !alreadySelected && isDiskSuitableForEC(disk) { - selected = append(selected, disk) - } } - } - return selected + candidates = append(candidates, &placement.DiskCandidate{ + NodeID: disk.NodeID, + DiskID: disk.DiskID, + DataCenter: disk.DataCenter, + Rack: disk.Rack, + VolumeCount: disk.DiskInfo.VolumeCount, + MaxVolumeCount: disk.DiskInfo.MaxVolumeCount, + ShardCount: ecShardCount, + FreeSlots: freeSlots, + LoadCount: disk.LoadCount, + }) + } + return candidates } -// selectBestFromRack selects the best disk from a rack for EC placement -func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string) *topology.DiskInfo { - if len(disks) == 0 { - return nil +// candidatesToDiskInfos converts placement results back to topology.DiskInfo +func candidatesToDiskInfos(candidates []*placement.DiskCandidate, originalDisks []*topology.DiskInfo) []*topology.DiskInfo { + // Create a map for quick lookup + diskMap := make(map[string]*topology.DiskInfo) + for _, disk := range originalDisks { + key := fmt.Sprintf("%s:%d", disk.NodeID, disk.DiskID) + diskMap[key] = disk } - var bestDisk *topology.DiskInfo - bestScore := -1.0 - - for _, disk := range disks { - if !isDiskSuitableForEC(disk) { - continue - } - - score := calculateECScore(disk, sourceRack, sourceDC) - if score > bestScore { - bestScore = score - bestDisk = disk + var result []*topology.DiskInfo + for _, candidate := range candidates { + key := fmt.Sprintf("%s:%d", candidate.NodeID, candidate.DiskID) + if disk, ok := diskMap[key]; ok { + result = append(result, disk) } } - - return bestDisk + return result } // calculateECScore calculates placement score for EC operations +// Used for logging and plan metadata func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) float64 { if disk.DiskInfo == nil { return 0.0 @@ -524,14 +540,12 @@ func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) floa // Consider current load (secondary factor) score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load - // Note: We don't penalize placing shards on the same rack/DC as source - // since the original volume will be deleted after EC conversion. - // This allows for better network efficiency and storage utilization. - return score } // isDiskSuitableForEC checks if a disk is suitable for EC placement +// Note: This is kept for backward compatibility but the placement package +// handles filtering internally func isDiskSuitableForEC(disk *topology.DiskInfo) bool { if disk.DiskInfo == nil { return false