diff --git a/weed/storage/erasure_coding/placement/placement.go b/weed/storage/erasure_coding/placement/placement.go new file mode 100644 index 000000000..7a362e2c0 --- /dev/null +++ b/weed/storage/erasure_coding/placement/placement.go @@ -0,0 +1,422 @@ +// Package placement provides consolidated EC shard placement logic used by +// both shell commands and worker tasks. +// +// This package encapsulates the algorithms for: +// - Selecting destination nodes/disks for EC shards +// - Ensuring proper spread across racks, servers, and disks +// - Balancing shards across the cluster +package placement + +import ( + "fmt" + "sort" +) + +// DiskCandidate represents a disk that can receive EC shards +type DiskCandidate struct { + NodeID string + DiskID uint32 + DataCenter string + Rack string + + // Capacity information + VolumeCount int64 + MaxVolumeCount int64 + ShardCount int // Current number of EC shards on this disk + FreeSlots int // Available slots for new shards + + // Load information + LoadCount int // Number of active tasks on this disk +} + +// NodeCandidate represents a server node that can receive EC shards +type NodeCandidate struct { + NodeID string + DataCenter string + Rack string + FreeSlots int + ShardCount int // Total shards across all disks + Disks []*DiskCandidate // All disks on this node +} + +// PlacementConfig configures EC shard placement behavior +type PlacementConfig struct { + // ShardsNeeded is the total number of shards to place + ShardsNeeded int + + // MaxShardsPerServer limits how many shards can be placed on a single server + // 0 means no limit (but prefer spreading when possible) + MaxShardsPerServer int + + // MaxShardsPerRack limits how many shards can be placed in a single rack + // 0 means no limit + MaxShardsPerRack int + + // MaxTaskLoad is the maximum task load count for a disk to be considered + MaxTaskLoad int + + // PreferDifferentServers when true, spreads shards across different servers + // before using multiple disks on the same server + PreferDifferentServers bool + + // PreferDifferentRacks when true, spreads shards across different racks + // before using multiple servers in the same rack + PreferDifferentRacks bool +} + +// DefaultConfig returns the default placement configuration +func DefaultConfig() PlacementConfig { + return PlacementConfig{ + ShardsNeeded: 14, + MaxShardsPerServer: 0, + MaxShardsPerRack: 0, + MaxTaskLoad: 5, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } +} + +// PlacementResult contains the selected destinations for EC shards +type PlacementResult struct { + SelectedDisks []*DiskCandidate + + // Statistics + ServersUsed int + RacksUsed int + DCsUsed int + + // Distribution maps + ShardsPerServer map[string]int + ShardsPerRack map[string]int + ShardsPerDC map[string]int +} + +// SelectDestinations selects the best disks for EC shard placement. +// This is the main entry point for EC placement logic. +// +// The algorithm works in multiple passes: +// 1. First pass: Select one disk from each rack (maximize rack diversity) +// 2. Second pass: Select one disk from each unused server in used racks (maximize server diversity) +// 3. Third pass: Select additional disks from servers already used (maximize disk diversity) +func SelectDestinations(disks []*DiskCandidate, config PlacementConfig) (*PlacementResult, error) { + if len(disks) == 0 { + return nil, fmt.Errorf("no disk candidates provided") + } + if config.ShardsNeeded <= 0 { + return nil, fmt.Errorf("shardsNeeded must be positive, got %d", config.ShardsNeeded) + } + + // Filter suitable disks + suitable := filterSuitableDisks(disks, config) + if len(suitable) == 0 { + return nil, fmt.Errorf("no suitable disks found after filtering") + } + + // Build indexes for efficient lookup + rackToDisks := groupDisksByRack(suitable) + serverToDisks := groupDisksByServer(suitable) + _ = serverToDisks // Used for reference + + result := &PlacementResult{ + SelectedDisks: make([]*DiskCandidate, 0, config.ShardsNeeded), + ShardsPerServer: make(map[string]int), + ShardsPerRack: make(map[string]int), + ShardsPerDC: make(map[string]int), + } + + usedDisks := make(map[string]bool) // "nodeID:diskID" -> bool + usedServers := make(map[string]bool) // nodeID -> bool + usedRacks := make(map[string]bool) // "dc:rack" -> bool + + // Pass 1: Select one disk from each rack (maximize rack diversity) + if config.PreferDifferentRacks { + // Sort racks by number of available servers (ascending) to prioritize underutilized racks + sortedRacks := sortRacksByServerCount(rackToDisks) + for _, rackKey := range sortedRacks { + if len(result.SelectedDisks) >= config.ShardsNeeded { + break + } + rackDisks := rackToDisks[rackKey] + // Select best disk from this rack, preferring a new server + disk := selectBestDiskFromRack(rackDisks, usedServers, usedDisks, config) + if disk != nil { + addDiskToResult(result, disk, usedDisks, usedServers, usedRacks) + } + } + } + + // Pass 2: Select disks from unused servers in already-used racks + if config.PreferDifferentServers && len(result.SelectedDisks) < config.ShardsNeeded { + for _, rackKey := range getSortedRackKeys(rackToDisks) { + if len(result.SelectedDisks) >= config.ShardsNeeded { + break + } + rackDisks := rackToDisks[rackKey] + for _, disk := range sortDisksByScore(rackDisks) { + if len(result.SelectedDisks) >= config.ShardsNeeded { + break + } + diskKey := getDiskKey(disk) + if usedDisks[diskKey] { + continue + } + // Skip if server already used (we want different servers in this pass) + if usedServers[disk.NodeID] { + continue + } + // Check server limit + if config.MaxShardsPerServer > 0 && result.ShardsPerServer[disk.NodeID] >= config.MaxShardsPerServer { + continue + } + // Check rack limit + if config.MaxShardsPerRack > 0 && result.ShardsPerRack[getRackKey(disk)] >= config.MaxShardsPerRack { + continue + } + addDiskToResult(result, disk, usedDisks, usedServers, usedRacks) + } + } + } + + // Pass 3: Fill remaining slots from already-used servers (different disks) + // Use round-robin across servers to balance shards evenly + if len(result.SelectedDisks) < config.ShardsNeeded { + // Group remaining disks by server + serverToRemainingDisks := make(map[string][]*DiskCandidate) + for _, disk := range suitable { + if !usedDisks[getDiskKey(disk)] { + serverToRemainingDisks[disk.NodeID] = append(serverToRemainingDisks[disk.NodeID], disk) + } + } + + // Sort each server's disks by score + for serverID := range serverToRemainingDisks { + serverToRemainingDisks[serverID] = sortDisksByScore(serverToRemainingDisks[serverID]) + } + + // Round-robin: repeatedly select from the server with the fewest shards + for len(result.SelectedDisks) < config.ShardsNeeded { + // Find server with fewest shards that still has available disks + var bestServer string + minShards := -1 + for serverID, disks := range serverToRemainingDisks { + if len(disks) == 0 { + continue + } + // Check server limit + if config.MaxShardsPerServer > 0 && result.ShardsPerServer[serverID] >= config.MaxShardsPerServer { + continue + } + shardCount := result.ShardsPerServer[serverID] + if minShards == -1 || shardCount < minShards { + minShards = shardCount + bestServer = serverID + } else if shardCount == minShards && serverID < bestServer { + // Tie-break by server name for determinism + bestServer = serverID + } + } + + if bestServer == "" { + // No more servers with available disks + break + } + + // Pop the best disk from this server + disks := serverToRemainingDisks[bestServer] + disk := disks[0] + serverToRemainingDisks[bestServer] = disks[1:] + + // Check rack limit + if config.MaxShardsPerRack > 0 && result.ShardsPerRack[getRackKey(disk)] >= config.MaxShardsPerRack { + continue + } + + addDiskToResult(result, disk, usedDisks, usedServers, usedRacks) + } + } + + // Calculate final statistics + result.ServersUsed = len(usedServers) + result.RacksUsed = len(usedRacks) + dcSet := make(map[string]bool) + for _, disk := range result.SelectedDisks { + dcSet[disk.DataCenter] = true + } + result.DCsUsed = len(dcSet) + + return result, nil +} + +// filterSuitableDisks filters disks that are suitable for EC placement +func filterSuitableDisks(disks []*DiskCandidate, config PlacementConfig) []*DiskCandidate { + var suitable []*DiskCandidate + for _, disk := range disks { + if disk.FreeSlots <= 0 { + continue + } + if config.MaxTaskLoad > 0 && disk.LoadCount > config.MaxTaskLoad { + continue + } + suitable = append(suitable, disk) + } + return suitable +} + +// groupDisksByRack groups disks by their rack (dc:rack key) +func groupDisksByRack(disks []*DiskCandidate) map[string][]*DiskCandidate { + result := make(map[string][]*DiskCandidate) + for _, disk := range disks { + key := getRackKey(disk) + result[key] = append(result[key], disk) + } + return result +} + +// groupDisksByServer groups disks by their server +func groupDisksByServer(disks []*DiskCandidate) map[string][]*DiskCandidate { + result := make(map[string][]*DiskCandidate) + for _, disk := range disks { + result[disk.NodeID] = append(result[disk.NodeID], disk) + } + return result +} + +// getRackKey returns the unique key for a rack (dc:rack) +func getRackKey(disk *DiskCandidate) string { + return fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack) +} + +// getDiskKey returns the unique key for a disk (nodeID:diskID) +func getDiskKey(disk *DiskCandidate) string { + return fmt.Sprintf("%s:%d", disk.NodeID, disk.DiskID) +} + +// sortRacksByServerCount returns rack keys sorted by number of servers (ascending) +func sortRacksByServerCount(rackToDisks map[string][]*DiskCandidate) []string { + // Count unique servers per rack + rackServerCount := make(map[string]int) + for rackKey, disks := range rackToDisks { + servers := make(map[string]bool) + for _, disk := range disks { + servers[disk.NodeID] = true + } + rackServerCount[rackKey] = len(servers) + } + + keys := getSortedRackKeys(rackToDisks) + sort.Slice(keys, func(i, j int) bool { + // Sort by server count (descending) to pick from racks with more options first + return rackServerCount[keys[i]] > rackServerCount[keys[j]] + }) + return keys +} + +// getSortedRackKeys returns rack keys in a deterministic order +func getSortedRackKeys(rackToDisks map[string][]*DiskCandidate) []string { + keys := make([]string, 0, len(rackToDisks)) + for k := range rackToDisks { + keys = append(keys, k) + } + sort.Strings(keys) + return keys +} + +// selectBestDiskFromRack selects the best disk from a rack for EC placement +// It prefers servers that haven't been used yet +func selectBestDiskFromRack(disks []*DiskCandidate, usedServers, usedDisks map[string]bool, config PlacementConfig) *DiskCandidate { + var bestDisk *DiskCandidate + bestScore := -1.0 + bestIsFromUnusedServer := false + + for _, disk := range disks { + if usedDisks[getDiskKey(disk)] { + continue + } + isFromUnusedServer := !usedServers[disk.NodeID] + score := calculateDiskScore(disk) + + // Prefer unused servers + if isFromUnusedServer && !bestIsFromUnusedServer { + bestDisk = disk + bestScore = score + bestIsFromUnusedServer = true + } else if isFromUnusedServer == bestIsFromUnusedServer && score > bestScore { + bestDisk = disk + bestScore = score + } + } + + return bestDisk +} + +// sortDisksByScore returns disks sorted by score (best first) +func sortDisksByScore(disks []*DiskCandidate) []*DiskCandidate { + sorted := make([]*DiskCandidate, len(disks)) + copy(sorted, disks) + sort.Slice(sorted, func(i, j int) bool { + return calculateDiskScore(sorted[i]) > calculateDiskScore(sorted[j]) + }) + return sorted +} + +// calculateDiskScore calculates a score for a disk candidate +// Higher score is better +func calculateDiskScore(disk *DiskCandidate) float64 { + score := 0.0 + + // Primary factor: available capacity (lower utilization is better) + if disk.MaxVolumeCount > 0 { + utilization := float64(disk.VolumeCount) / float64(disk.MaxVolumeCount) + score += (1.0 - utilization) * 60.0 // Up to 60 points + } else { + score += 30.0 // Default if no max count + } + + // Secondary factor: fewer shards already on this disk is better + score += float64(10-disk.ShardCount) * 2.0 // Up to 20 points + + // Tertiary factor: lower load is better + score += float64(10 - disk.LoadCount) // Up to 10 points + + return score +} + +// addDiskToResult adds a disk to the result and updates tracking maps +func addDiskToResult(result *PlacementResult, disk *DiskCandidate, + usedDisks, usedServers, usedRacks map[string]bool) { + diskKey := getDiskKey(disk) + rackKey := getRackKey(disk) + + result.SelectedDisks = append(result.SelectedDisks, disk) + usedDisks[diskKey] = true + usedServers[disk.NodeID] = true + usedRacks[rackKey] = true + result.ShardsPerServer[disk.NodeID]++ + result.ShardsPerRack[rackKey]++ + result.ShardsPerDC[disk.DataCenter]++ +} + +// VerifySpread checks if the placement result meets diversity requirements +func VerifySpread(result *PlacementResult, minServers, minRacks int) error { + if result.ServersUsed < minServers { + return fmt.Errorf("only %d servers used, need at least %d", result.ServersUsed, minServers) + } + if result.RacksUsed < minRacks { + return fmt.Errorf("only %d racks used, need at least %d", result.RacksUsed, minRacks) + } + return nil +} + +// CalculateIdealDistribution returns the ideal number of shards per server +// when we have a certain number of shards and servers +func CalculateIdealDistribution(totalShards, numServers int) (min, max int) { + if numServers <= 0 { + return 0, totalShards + } + min = totalShards / numServers + max = min + if totalShards%numServers != 0 { + max = min + 1 + } + return +} diff --git a/weed/storage/erasure_coding/placement/placement_test.go b/weed/storage/erasure_coding/placement/placement_test.go new file mode 100644 index 000000000..9d0312179 --- /dev/null +++ b/weed/storage/erasure_coding/placement/placement_test.go @@ -0,0 +1,517 @@ +package placement + +import ( +"testing" +) + +// Helper function to create disk candidates for testing +func makeDisk(nodeID string, diskID uint32, dc, rack string, freeSlots int) *DiskCandidate { + return &DiskCandidate{ + NodeID: nodeID, + DiskID: diskID, + DataCenter: dc, + Rack: rack, + VolumeCount: 0, + MaxVolumeCount: 100, + ShardCount: 0, + FreeSlots: freeSlots, + LoadCount: 0, + } +} + +func TestSelectDestinations_SingleRack(t *testing.T) { + // Test: 3 servers in same rack, each with 2 disks, need 6 shards + // Expected: Should spread across all 6 disks (one per disk) + disks := []*DiskCandidate{ + makeDisk("server1", 0, "dc1", "rack1", 10), + makeDisk("server1", 1, "dc1", "rack1", 10), + makeDisk("server2", 0, "dc1", "rack1", 10), + makeDisk("server2", 1, "dc1", "rack1", 10), + makeDisk("server3", 0, "dc1", "rack1", 10), + makeDisk("server3", 1, "dc1", "rack1", 10), + } + + config := PlacementConfig{ + ShardsNeeded: 6, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.SelectedDisks) != 6 { + t.Errorf("expected 6 selected disks, got %d", len(result.SelectedDisks)) + } + + // Verify all 3 servers are used + if result.ServersUsed != 3 { + t.Errorf("expected 3 servers used, got %d", result.ServersUsed) + } + + // Verify each disk is unique + diskSet := make(map[string]bool) + for _, disk := range result.SelectedDisks { + key := getDiskKey(disk) + if diskSet[key] { + t.Errorf("disk %s selected multiple times", key) + } + diskSet[key] = true + } +} + +func TestSelectDestinations_MultipleRacks(t *testing.T) { + // Test: 2 racks with 2 servers each, each server has 2 disks + // Need 8 shards + // Expected: Should spread across all 8 disks + disks := []*DiskCandidate{ + makeDisk("server1", 0, "dc1", "rack1", 10), + makeDisk("server1", 1, "dc1", "rack1", 10), + makeDisk("server2", 0, "dc1", "rack1", 10), + makeDisk("server2", 1, "dc1", "rack1", 10), + makeDisk("server3", 0, "dc1", "rack2", 10), + makeDisk("server3", 1, "dc1", "rack2", 10), + makeDisk("server4", 0, "dc1", "rack2", 10), + makeDisk("server4", 1, "dc1", "rack2", 10), + } + + config := PlacementConfig{ + ShardsNeeded: 8, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.SelectedDisks) != 8 { + t.Errorf("expected 8 selected disks, got %d", len(result.SelectedDisks)) + } + + // Verify all 4 servers are used + if result.ServersUsed != 4 { + t.Errorf("expected 4 servers used, got %d", result.ServersUsed) + } + + // Verify both racks are used + if result.RacksUsed != 2 { + t.Errorf("expected 2 racks used, got %d", result.RacksUsed) + } +} + +func TestSelectDestinations_PrefersDifferentServers(t *testing.T) { + // Test: 4 servers with 4 disks each, need 4 shards + // Expected: Should use one disk from each server + disks := []*DiskCandidate{ + makeDisk("server1", 0, "dc1", "rack1", 10), + makeDisk("server1", 1, "dc1", "rack1", 10), + makeDisk("server1", 2, "dc1", "rack1", 10), + makeDisk("server1", 3, "dc1", "rack1", 10), + makeDisk("server2", 0, "dc1", "rack1", 10), + makeDisk("server2", 1, "dc1", "rack1", 10), + makeDisk("server2", 2, "dc1", "rack1", 10), + makeDisk("server2", 3, "dc1", "rack1", 10), + makeDisk("server3", 0, "dc1", "rack1", 10), + makeDisk("server3", 1, "dc1", "rack1", 10), + makeDisk("server3", 2, "dc1", "rack1", 10), + makeDisk("server3", 3, "dc1", "rack1", 10), + makeDisk("server4", 0, "dc1", "rack1", 10), + makeDisk("server4", 1, "dc1", "rack1", 10), + makeDisk("server4", 2, "dc1", "rack1", 10), + makeDisk("server4", 3, "dc1", "rack1", 10), + } + + config := PlacementConfig{ + ShardsNeeded: 4, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.SelectedDisks) != 4 { + t.Errorf("expected 4 selected disks, got %d", len(result.SelectedDisks)) + } + + // Verify all 4 servers are used (one shard per server) + if result.ServersUsed != 4 { + t.Errorf("expected 4 servers used, got %d", result.ServersUsed) + } + + // Each server should have exactly 1 shard + for server, count := range result.ShardsPerServer { + if count != 1 { + t.Errorf("server %s has %d shards, expected 1", server, count) + } + } +} + +func TestSelectDestinations_SpilloverToMultipleDisksPerServer(t *testing.T) { + // Test: 2 servers with 4 disks each, need 6 shards + // Expected: First pick one from each server (2 shards), then one more from each (4 shards), + // then fill remaining from any server (6 shards) + disks := []*DiskCandidate{ + makeDisk("server1", 0, "dc1", "rack1", 10), + makeDisk("server1", 1, "dc1", "rack1", 10), + makeDisk("server1", 2, "dc1", "rack1", 10), + makeDisk("server1", 3, "dc1", "rack1", 10), + makeDisk("server2", 0, "dc1", "rack1", 10), + makeDisk("server2", 1, "dc1", "rack1", 10), + makeDisk("server2", 2, "dc1", "rack1", 10), + makeDisk("server2", 3, "dc1", "rack1", 10), + } + + config := PlacementConfig{ + ShardsNeeded: 6, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.SelectedDisks) != 6 { + t.Errorf("expected 6 selected disks, got %d", len(result.SelectedDisks)) + } + + // Both servers should be used + if result.ServersUsed != 2 { + t.Errorf("expected 2 servers used, got %d", result.ServersUsed) + } + + // Each server should have exactly 3 shards (balanced) + for server, count := range result.ShardsPerServer { + if count != 3 { + t.Errorf("server %s has %d shards, expected 3", server, count) + } + } +} + +func TestSelectDestinations_MaxShardsPerServer(t *testing.T) { + // Test: 2 servers with 4 disks each, need 6 shards, max 2 per server + // Expected: Should only select 4 shards (2 per server limit) + disks := []*DiskCandidate{ + makeDisk("server1", 0, "dc1", "rack1", 10), + makeDisk("server1", 1, "dc1", "rack1", 10), + makeDisk("server1", 2, "dc1", "rack1", 10), + makeDisk("server1", 3, "dc1", "rack1", 10), + makeDisk("server2", 0, "dc1", "rack1", 10), + makeDisk("server2", 1, "dc1", "rack1", 10), + makeDisk("server2", 2, "dc1", "rack1", 10), + makeDisk("server2", 3, "dc1", "rack1", 10), + } + + config := PlacementConfig{ + ShardsNeeded: 6, + MaxShardsPerServer: 2, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Should only get 4 shards due to server limit + if len(result.SelectedDisks) != 4 { + t.Errorf("expected 4 selected disks (limit 2 per server), got %d", len(result.SelectedDisks)) + } + + // No server should exceed the limit + for server, count := range result.ShardsPerServer { + if count > 2 { + t.Errorf("server %s has %d shards, exceeds limit of 2", server, count) + } + } +} + +func TestSelectDestinations_14ShardsAcross7Servers(t *testing.T) { + // Test: Real-world EC scenario - 14 shards across 7 servers with 2 disks each + // Expected: Should spread evenly (2 shards per server) + var disks []*DiskCandidate + for i := 1; i <= 7; i++ { + serverID := "server" + string(rune('0'+i)) + disks = append(disks, makeDisk(serverID, 0, "dc1", "rack1", 10)) + disks = append(disks, makeDisk(serverID, 1, "dc1", "rack1", 10)) + } + + config := PlacementConfig{ + ShardsNeeded: 14, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.SelectedDisks) != 14 { + t.Errorf("expected 14 selected disks, got %d", len(result.SelectedDisks)) + } + + // All 7 servers should be used + if result.ServersUsed != 7 { + t.Errorf("expected 7 servers used, got %d", result.ServersUsed) + } + + // Each server should have exactly 2 shards + for server, count := range result.ShardsPerServer { + if count != 2 { + t.Errorf("server %s has %d shards, expected 2", server, count) + } + } +} + +func TestSelectDestinations_FewerServersThanShards(t *testing.T) { + // Test: Only 3 servers but need 6 shards + // Expected: Should distribute evenly (2 per server) + disks := []*DiskCandidate{ + makeDisk("server1", 0, "dc1", "rack1", 10), + makeDisk("server1", 1, "dc1", "rack1", 10), + makeDisk("server1", 2, "dc1", "rack1", 10), + makeDisk("server2", 0, "dc1", "rack1", 10), + makeDisk("server2", 1, "dc1", "rack1", 10), + makeDisk("server2", 2, "dc1", "rack1", 10), + makeDisk("server3", 0, "dc1", "rack1", 10), + makeDisk("server3", 1, "dc1", "rack1", 10), + makeDisk("server3", 2, "dc1", "rack1", 10), + } + + config := PlacementConfig{ + ShardsNeeded: 6, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.SelectedDisks) != 6 { + t.Errorf("expected 6 selected disks, got %d", len(result.SelectedDisks)) + } + + // All 3 servers should be used + if result.ServersUsed != 3 { + t.Errorf("expected 3 servers used, got %d", result.ServersUsed) + } + + // Each server should have exactly 2 shards + for server, count := range result.ShardsPerServer { + if count != 2 { + t.Errorf("server %s has %d shards, expected 2", server, count) + } + } +} + +func TestSelectDestinations_NoSuitableDisks(t *testing.T) { + // Test: All disks have no free slots + disks := []*DiskCandidate{ + {NodeID: "server1", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 0}, + {NodeID: "server2", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 0}, + } + + config := PlacementConfig{ + ShardsNeeded: 4, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + _, err := SelectDestinations(disks, config) + if err == nil { + t.Error("expected error for no suitable disks, got nil") + } +} + +func TestSelectDestinations_EmptyInput(t *testing.T) { + config := DefaultConfig() + _, err := SelectDestinations([]*DiskCandidate{}, config) + if err == nil { + t.Error("expected error for empty input, got nil") + } +} + +func TestSelectDestinations_FiltersByLoad(t *testing.T) { + // Test: Some disks have too high load + disks := []*DiskCandidate{ + {NodeID: "server1", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 10, LoadCount: 10}, + {NodeID: "server2", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 10, LoadCount: 2}, + {NodeID: "server3", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 10, LoadCount: 1}, + } + + config := PlacementConfig{ + ShardsNeeded: 2, + MaxTaskLoad: 5, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Should only select from server2 and server3 (server1 has too high load) + for _, disk := range result.SelectedDisks { + if disk.NodeID == "server1" { + t.Errorf("disk from server1 should not be selected (load too high)") + } + } +} + +func TestCalculateDiskScore(t *testing.T) { + // Test that score calculation works as expected + lowUtilDisk := &DiskCandidate{ + VolumeCount: 10, + MaxVolumeCount: 100, + ShardCount: 0, + LoadCount: 0, + } + + highUtilDisk := &DiskCandidate{ + VolumeCount: 90, + MaxVolumeCount: 100, + ShardCount: 5, + LoadCount: 5, + } + + lowScore := calculateDiskScore(lowUtilDisk) + highScore := calculateDiskScore(highUtilDisk) + + if lowScore <= highScore { + t.Errorf("low utilization disk should have higher score: low=%f, high=%f", lowScore, highScore) + } +} + +func TestCalculateIdealDistribution(t *testing.T) { + tests := []struct { + totalShards int + numServers int + expectedMin int + expectedMax int + }{ + {14, 7, 2, 2}, // Even distribution + {14, 4, 3, 4}, // Uneven: 14/4 = 3 remainder 2 + {6, 3, 2, 2}, // Even distribution + {7, 3, 2, 3}, // Uneven: 7/3 = 2 remainder 1 + {10, 0, 0, 10}, // Edge case: no servers + {0, 5, 0, 0}, // Edge case: no shards + } + + for _, tt := range tests { + min, max := CalculateIdealDistribution(tt.totalShards, tt.numServers) + if min != tt.expectedMin || max != tt.expectedMax { + t.Errorf("CalculateIdealDistribution(%d, %d) = (%d, %d), want (%d, %d)", +tt.totalShards, tt.numServers, min, max, tt.expectedMin, tt.expectedMax) + } + } +} + +func TestVerifySpread(t *testing.T) { + result := &PlacementResult{ + ServersUsed: 3, + RacksUsed: 2, + } + + // Should pass + if err := VerifySpread(result, 3, 2); err != nil { + t.Errorf("unexpected error: %v", err) + } + + // Should fail - not enough servers + if err := VerifySpread(result, 4, 2); err == nil { + t.Error("expected error for insufficient servers") + } + + // Should fail - not enough racks + if err := VerifySpread(result, 3, 3); err == nil { + t.Error("expected error for insufficient racks") + } +} + +func TestSelectDestinations_MultiDC(t *testing.T) { + // Test: 2 DCs, each with 2 racks, each rack has 2 servers + disks := []*DiskCandidate{ + // DC1, Rack1 + makeDisk("dc1-r1-s1", 0, "dc1", "rack1", 10), + makeDisk("dc1-r1-s1", 1, "dc1", "rack1", 10), + makeDisk("dc1-r1-s2", 0, "dc1", "rack1", 10), + makeDisk("dc1-r1-s2", 1, "dc1", "rack1", 10), + // DC1, Rack2 + makeDisk("dc1-r2-s1", 0, "dc1", "rack2", 10), + makeDisk("dc1-r2-s1", 1, "dc1", "rack2", 10), + makeDisk("dc1-r2-s2", 0, "dc1", "rack2", 10), + makeDisk("dc1-r2-s2", 1, "dc1", "rack2", 10), + // DC2, Rack1 + makeDisk("dc2-r1-s1", 0, "dc2", "rack1", 10), + makeDisk("dc2-r1-s1", 1, "dc2", "rack1", 10), + makeDisk("dc2-r1-s2", 0, "dc2", "rack1", 10), + makeDisk("dc2-r1-s2", 1, "dc2", "rack1", 10), + // DC2, Rack2 + makeDisk("dc2-r2-s1", 0, "dc2", "rack2", 10), + makeDisk("dc2-r2-s1", 1, "dc2", "rack2", 10), + makeDisk("dc2-r2-s2", 0, "dc2", "rack2", 10), + makeDisk("dc2-r2-s2", 1, "dc2", "rack2", 10), + } + + config := PlacementConfig{ + ShardsNeeded: 8, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.SelectedDisks) != 8 { + t.Errorf("expected 8 selected disks, got %d", len(result.SelectedDisks)) + } + + // Should use all 4 racks + if result.RacksUsed != 4 { + t.Errorf("expected 4 racks used, got %d", result.RacksUsed) + } + + // Should use both DCs + if result.DCsUsed != 2 { + t.Errorf("expected 2 DCs used, got %d", result.DCsUsed) + } +} + +func TestSelectDestinations_SameRackDifferentDC(t *testing.T) { + // Test: Same rack name in different DCs should be treated as different racks + disks := []*DiskCandidate{ + makeDisk("dc1-s1", 0, "dc1", "rack1", 10), + makeDisk("dc2-s1", 0, "dc2", "rack1", 10), + } + + config := PlacementConfig{ + ShardsNeeded: 2, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Should use 2 racks (dc1:rack1 and dc2:rack1 are different) + if result.RacksUsed != 2 { + t.Errorf("expected 2 racks used (different DCs), got %d", result.RacksUsed) + } +}