From 425a54019b716b85e6aab0429bfac0ba381a0094 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 12 Aug 2025 09:45:30 -0700 Subject: [PATCH] adding EcVacuumLogic --- .../ec_vacuum_generation_unit_test.go | 4 + .../worker/tasks/ec_vacuum/ec_vacuum_logic.go | 253 +++++++ .../tasks/ec_vacuum/ec_vacuum_logic_test.go | 687 ++++++++++++++++++ .../ec_vacuum/ec_vacuum_scenarios_test.go | 582 +++++++++++++++ .../tasks/ec_vacuum/safety_checks_test.go | 2 +- 5 files changed, 1527 insertions(+), 1 deletion(-) create mode 100644 weed/worker/tasks/ec_vacuum/ec_vacuum_logic.go create mode 100644 weed/worker/tasks/ec_vacuum/ec_vacuum_logic_test.go create mode 100644 weed/worker/tasks/ec_vacuum/ec_vacuum_scenarios_test.go diff --git a/weed/worker/tasks/ec_vacuum/ec_vacuum_generation_unit_test.go b/weed/worker/tasks/ec_vacuum/ec_vacuum_generation_unit_test.go index b9e08ab4d..963479be2 100644 --- a/weed/worker/tasks/ec_vacuum/ec_vacuum_generation_unit_test.go +++ b/weed/worker/tasks/ec_vacuum/ec_vacuum_generation_unit_test.go @@ -237,6 +237,10 @@ func TestEcVacuumActivateNewGeneration(t *testing.T) { task := NewEcVacuumTask("activate-test", volumeId, collection, sourceNodes) + // Set generations manually for this test (normally done by Execute via task parameters) + task.sourceGeneration = 0 + task.targetGeneration = 1 + // Simulate the activation step ctx := context.Background() diff --git a/weed/worker/tasks/ec_vacuum/ec_vacuum_logic.go b/weed/worker/tasks/ec_vacuum/ec_vacuum_logic.go new file mode 100644 index 000000000..7ef8a92db --- /dev/null +++ b/weed/worker/tasks/ec_vacuum/ec_vacuum_logic.go @@ -0,0 +1,253 @@ +package ec_vacuum + +import ( + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" +) + +// EcVacuumLogic contains the core business logic for EC vacuum operations +// This is extracted from EcVacuumTask to make it easily testable +type EcVacuumLogic struct{} + +// NewEcVacuumLogic creates a new instance of the core logic +func NewEcVacuumLogic() *EcVacuumLogic { + return &EcVacuumLogic{} +} + +// GenerationPlan represents a plan for generation transitions during vacuum +type GenerationPlan struct { + VolumeID uint32 + SourceGeneration uint32 + TargetGeneration uint32 + SourceNodes map[pb.ServerAddress]erasure_coding.ShardBits + CleanupPlan []uint32 // Generations to be cleaned up +} + +// ShardDistribution represents how shards are distributed across nodes +type ShardDistribution struct { + Generation uint32 + Nodes map[pb.ServerAddress]erasure_coding.ShardBits +} + +// VacuumPlan represents the complete plan for an EC vacuum operation +type VacuumPlan struct { + VolumeID uint32 + Collection string + CurrentGeneration uint32 + TargetGeneration uint32 + SourceDistribution ShardDistribution + ExpectedDistribution ShardDistribution + GenerationsToCleanup []uint32 + SafetyChecks []string +} + +// DetermineGenerationsFromParams extracts generation information from task parameters +func (logic *EcVacuumLogic) DetermineGenerationsFromParams(params *worker_pb.TaskParams) (sourceGen, targetGen uint32, err error) { + if params == nil { + return 0, 0, fmt.Errorf("task parameters cannot be nil") + } + + if len(params.Sources) == 0 { + // Fallback to safe defaults for backward compatibility + return 0, 1, nil + } + + // Use generation from first source (all sources should have same generation) + if params.Sources[0].Generation > 0 { + sourceGen = params.Sources[0].Generation + targetGen = sourceGen + 1 + } else { + // Generation 0 case + sourceGen = 0 + targetGen = 1 + } + + // Validate consistency - all sources should have the same generation + for i, source := range params.Sources { + if source.Generation != sourceGen { + return 0, 0, fmt.Errorf("inconsistent generations in sources: source[0]=%d, source[%d]=%d", + sourceGen, i, source.Generation) + } + } + + return sourceGen, targetGen, nil +} + +// ParseSourceNodes extracts source node information from task parameters +func (logic *EcVacuumLogic) ParseSourceNodes(params *worker_pb.TaskParams) (map[pb.ServerAddress]erasure_coding.ShardBits, error) { + if params == nil { + return nil, fmt.Errorf("task parameters cannot be nil") + } + + sourceNodes := make(map[pb.ServerAddress]erasure_coding.ShardBits) + + for _, source := range params.Sources { + if source.Node == "" { + continue + } + + serverAddr := pb.ServerAddress(source.Node) + var shardBits erasure_coding.ShardBits + + // Convert shard IDs to ShardBits + for _, shardId := range source.ShardIds { + if shardId < erasure_coding.TotalShardsCount { + shardBits = shardBits.AddShardId(erasure_coding.ShardId(shardId)) + } + } + + if shardBits.ShardIdCount() > 0 { + sourceNodes[serverAddr] = shardBits + } + } + + if len(sourceNodes) == 0 { + return nil, fmt.Errorf("no valid source nodes found: sources=%d", len(params.Sources)) + } + + return sourceNodes, nil +} + +// CreateVacuumPlan creates a comprehensive plan for the EC vacuum operation +func (logic *EcVacuumLogic) CreateVacuumPlan(volumeID uint32, collection string, params *worker_pb.TaskParams) (*VacuumPlan, error) { + // Extract generations + sourceGen, targetGen, err := logic.DetermineGenerationsFromParams(params) + if err != nil { + return nil, fmt.Errorf("failed to determine generations: %w", err) + } + + // Parse source nodes + sourceNodes, err := logic.ParseSourceNodes(params) + if err != nil { + return nil, fmt.Errorf("failed to parse source nodes: %w", err) + } + + // Create source distribution + sourceDistribution := ShardDistribution{ + Generation: sourceGen, + Nodes: sourceNodes, + } + + // Expected distribution is same nodes but with target generation + expectedDistribution := ShardDistribution{ + Generation: targetGen, + Nodes: sourceNodes, // Same nodes, new generation + } + + // Determine what to cleanup (simplified: just source generation) + generationsToCleanup := []uint32{sourceGen} + + // Generate safety checks + safetyChecks := logic.generateSafetyChecks(sourceDistribution, targetGen) + + return &VacuumPlan{ + VolumeID: volumeID, + Collection: collection, + CurrentGeneration: sourceGen, + TargetGeneration: targetGen, + SourceDistribution: sourceDistribution, + ExpectedDistribution: expectedDistribution, + GenerationsToCleanup: generationsToCleanup, + SafetyChecks: safetyChecks, + }, nil +} + +// ValidateShardDistribution validates that the shard distribution is sufficient for vacuum +func (logic *EcVacuumLogic) ValidateShardDistribution(distribution ShardDistribution) error { + totalShards := erasure_coding.ShardBits(0) + + for _, shardBits := range distribution.Nodes { + totalShards = totalShards.Plus(shardBits) + } + + shardCount := totalShards.ShardIdCount() + if shardCount < erasure_coding.DataShardsCount { + return fmt.Errorf("insufficient shards for reconstruction: have %d, need at least %d", + shardCount, erasure_coding.DataShardsCount) + } + + return nil +} + +// CalculateCleanupGenerations determines which generations should be cleaned up +func (logic *EcVacuumLogic) CalculateCleanupGenerations(currentGen, targetGen uint32, availableGenerations []uint32) []uint32 { + var toCleanup []uint32 + + for _, gen := range availableGenerations { + // Don't clean up the target generation + if gen != targetGen { + toCleanup = append(toCleanup, gen) + } + } + + return toCleanup +} + +// generateSafetyChecks creates a list of safety checks for the vacuum plan +func (logic *EcVacuumLogic) generateSafetyChecks(distribution ShardDistribution, targetGen uint32) []string { + var checks []string + + // Check 1: Sufficient shards + totalShards := erasure_coding.ShardBits(0) + for _, shardBits := range distribution.Nodes { + totalShards = totalShards.Plus(shardBits) + } + + checks = append(checks, fmt.Sprintf("Total shards available: %d/%d", + totalShards.ShardIdCount(), erasure_coding.TotalShardsCount)) + + // Check 2: Minimum data shards + if totalShards.ShardIdCount() >= erasure_coding.DataShardsCount { + checks = append(checks, "✅ Sufficient data shards for reconstruction") + } else { + checks = append(checks, "❌ INSUFFICIENT data shards for reconstruction") + } + + // Check 3: Node distribution + checks = append(checks, fmt.Sprintf("Shard distribution across %d nodes", len(distribution.Nodes))) + + // Check 4: Generation safety + checks = append(checks, fmt.Sprintf("Target generation %d != source generation %d", + targetGen, distribution.Generation)) + + return checks +} + +// EstimateCleanupImpact estimates the storage impact of cleanup operations +func (logic *EcVacuumLogic) EstimateCleanupImpact(plan *VacuumPlan, volumeSize uint64) CleanupImpact { + // Estimate size per generation + sizePerGeneration := volumeSize + + // Calculate total cleanup impact + var totalCleanupSize uint64 + for range plan.GenerationsToCleanup { + totalCleanupSize += sizePerGeneration + } + + return CleanupImpact{ + GenerationsToCleanup: len(plan.GenerationsToCleanup), + EstimatedSizeFreed: totalCleanupSize, + NodesAffected: len(plan.SourceDistribution.Nodes), + ShardsToDelete: logic.countShardsToDelete(plan), + } +} + +// CleanupImpact represents the estimated impact of cleanup operations +type CleanupImpact struct { + GenerationsToCleanup int + EstimatedSizeFreed uint64 + NodesAffected int + ShardsToDelete int +} + +// countShardsToDelete counts how many shard files will be deleted +func (logic *EcVacuumLogic) countShardsToDelete(plan *VacuumPlan) int { + totalShards := 0 + for _, shardBits := range plan.SourceDistribution.Nodes { + totalShards += shardBits.ShardIdCount() + } + return totalShards * len(plan.GenerationsToCleanup) +} diff --git a/weed/worker/tasks/ec_vacuum/ec_vacuum_logic_test.go b/weed/worker/tasks/ec_vacuum/ec_vacuum_logic_test.go new file mode 100644 index 000000000..e4bc242fa --- /dev/null +++ b/weed/worker/tasks/ec_vacuum/ec_vacuum_logic_test.go @@ -0,0 +1,687 @@ +package ec_vacuum + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" +) + +func TestDetermineGenerationsFromParams(t *testing.T) { + logic := NewEcVacuumLogic() + + tests := []struct { + name string + params *worker_pb.TaskParams + expectSrc uint32 + expectTgt uint32 + expectError bool + }{ + { + name: "nil params", + params: nil, + expectError: true, + }, + { + name: "empty sources - fallback to defaults", + params: &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{}, + }, + expectSrc: 0, + expectTgt: 1, + }, + { + name: "generation 0 source", + params: &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{ + {Generation: 0}, + }, + }, + expectSrc: 0, + expectTgt: 1, + }, + { + name: "generation 1 source", + params: &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{ + {Generation: 1}, + }, + }, + expectSrc: 1, + expectTgt: 2, + }, + { + name: "generation 5 source", + params: &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{ + {Generation: 5}, + }, + }, + expectSrc: 5, + expectTgt: 6, + }, + { + name: "inconsistent generations", + params: &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{ + {Generation: 1}, + {Generation: 2}, // Different generation! + }, + }, + expectError: true, + }, + { + name: "multiple sources same generation", + params: &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{ + {Generation: 3}, + {Generation: 3}, + {Generation: 3}, + }, + }, + expectSrc: 3, + expectTgt: 4, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srcGen, tgtGen, err := logic.DetermineGenerationsFromParams(tt.params) + + if tt.expectError { + if err == nil { + t.Errorf("expected error but got none") + } + return + } + + if err != nil { + t.Errorf("unexpected error: %v", err) + return + } + + if srcGen != tt.expectSrc { + t.Errorf("source generation: expected %d, got %d", tt.expectSrc, srcGen) + } + + if tgtGen != tt.expectTgt { + t.Errorf("target generation: expected %d, got %d", tt.expectTgt, tgtGen) + } + }) + } +} + +func TestParseSourceNodes(t *testing.T) { + logic := NewEcVacuumLogic() + + tests := []struct { + name string + params *worker_pb.TaskParams + expectNodes int + expectShards map[string][]int // node -> shard IDs + expectError bool + }{ + { + name: "nil params", + params: nil, + expectError: true, + }, + { + name: "empty sources", + params: &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{}, + }, + expectError: true, + }, + { + name: "single node with shards", + params: &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{ + { + Node: "node1:8080", + ShardIds: []uint32{0, 1, 2, 3, 4, 5}, + }, + }, + }, + expectNodes: 1, + expectShards: map[string][]int{ + "node1:8080": {0, 1, 2, 3, 4, 5}, + }, + }, + { + name: "multiple nodes with different shards", + params: &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{ + { + Node: "node1:8080", + ShardIds: []uint32{0, 1, 2, 3, 4}, + }, + { + Node: "node2:8080", + ShardIds: []uint32{5, 6, 7, 8, 9}, + }, + { + Node: "node3:8080", + ShardIds: []uint32{10, 11, 12, 13}, + }, + }, + }, + expectNodes: 3, + expectShards: map[string][]int{ + "node1:8080": {0, 1, 2, 3, 4}, + "node2:8080": {5, 6, 7, 8, 9}, + "node3:8080": {10, 11, 12, 13}, + }, + }, + { + name: "overlapping shards across nodes", + params: &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{ + { + Node: "node1:8080", + ShardIds: []uint32{0, 1, 2}, + }, + { + Node: "node2:8080", + ShardIds: []uint32{0, 3, 4}, // Shard 0 is on both nodes + }, + }, + }, + expectNodes: 2, + expectShards: map[string][]int{ + "node1:8080": {0, 1, 2}, + "node2:8080": {0, 3, 4}, + }, + }, + { + name: "empty node name ignored", + params: &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{ + { + Node: "", + ShardIds: []uint32{0, 1, 2}, + }, + { + Node: "node1:8080", + ShardIds: []uint32{3, 4, 5}, + }, + }, + }, + expectNodes: 1, + expectShards: map[string][]int{ + "node1:8080": {3, 4, 5}, + }, + }, + { + name: "invalid shard IDs filtered out", + params: &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{ + { + Node: "node1:8080", + ShardIds: []uint32{0, 1, 14, 15, 100}, // 14+ are invalid + }, + }, + }, + expectNodes: 1, + expectShards: map[string][]int{ + "node1:8080": {0, 1}, // Only valid shards + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sourceNodes, err := logic.ParseSourceNodes(tt.params) + + if tt.expectError { + if err == nil { + t.Errorf("expected error but got none") + } + return + } + + if err != nil { + t.Errorf("unexpected error: %v", err) + return + } + + if len(sourceNodes) != tt.expectNodes { + t.Errorf("node count: expected %d, got %d", tt.expectNodes, len(sourceNodes)) + return + } + + // Verify shard distribution + for nodeAddr, expectedShardIds := range tt.expectShards { + shardBits, exists := sourceNodes[pb.ServerAddress(nodeAddr)] + if !exists { + t.Errorf("expected node %s not found", nodeAddr) + continue + } + + // Convert ShardBits back to slice for comparison + var actualShardIds []int + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + if shardBits.HasShardId(erasure_coding.ShardId(i)) { + actualShardIds = append(actualShardIds, i) + } + } + + if len(actualShardIds) != len(expectedShardIds) { + t.Errorf("node %s shard count: expected %d, got %d", + nodeAddr, len(expectedShardIds), len(actualShardIds)) + continue + } + + // Check each expected shard + for _, expectedId := range expectedShardIds { + found := false + for _, actualId := range actualShardIds { + if actualId == expectedId { + found = true + break + } + } + if !found { + t.Errorf("node %s missing expected shard %d", nodeAddr, expectedId) + } + } + } + }) + } +} + +func TestValidateShardDistribution(t *testing.T) { + logic := NewEcVacuumLogic() + + tests := []struct { + name string + distribution ShardDistribution + expectError bool + description string + }{ + { + name: "sufficient shards for reconstruction", + distribution: ShardDistribution{ + Generation: 1, + Nodes: map[pb.ServerAddress]erasure_coding.ShardBits{ + "node1:8080": createShardBits([]int{0, 1, 2, 3, 4}), + "node2:8080": createShardBits([]int{5, 6, 7, 8, 9}), + }, + }, + expectError: false, + description: "10 shards >= 10 data shards required", + }, + { + name: "exactly minimum data shards", + distribution: ShardDistribution{ + Generation: 1, + Nodes: map[pb.ServerAddress]erasure_coding.ShardBits{ + "node1:8080": createShardBits([]int{0, 1, 2, 3, 4}), + "node2:8080": createShardBits([]int{5, 6, 7, 8, 9}), + }, + }, + expectError: false, + description: "Exactly 10 data shards", + }, + { + name: "insufficient shards", + distribution: ShardDistribution{ + Generation: 1, + Nodes: map[pb.ServerAddress]erasure_coding.ShardBits{ + "node1:8080": createShardBits([]int{0, 1, 2}), + "node2:8080": createShardBits([]int{3, 4, 5}), + }, + }, + expectError: true, + description: "Only 6 shards < 10 data shards required", + }, + { + name: "all shards available", + distribution: ShardDistribution{ + Generation: 1, + Nodes: map[pb.ServerAddress]erasure_coding.ShardBits{ + "node1:8080": createShardBits([]int{0, 1, 2, 3, 4}), + "node2:8080": createShardBits([]int{5, 6, 7, 8, 9}), + "node3:8080": createShardBits([]int{10, 11, 12, 13}), + }, + }, + expectError: false, + description: "All 14 shards available", + }, + { + name: "single node with all shards", + distribution: ShardDistribution{ + Generation: 1, + Nodes: map[pb.ServerAddress]erasure_coding.ShardBits{ + "node1:8080": createShardBits([]int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}), + }, + }, + expectError: false, + description: "All shards on single node", + }, + { + name: "empty distribution", + distribution: ShardDistribution{ + Generation: 1, + Nodes: map[pb.ServerAddress]erasure_coding.ShardBits{}, + }, + expectError: true, + description: "No shards available", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := logic.ValidateShardDistribution(tt.distribution) + + if tt.expectError && err == nil { + t.Errorf("expected error for %s but got none", tt.description) + } + + if !tt.expectError && err != nil { + t.Errorf("unexpected error for %s: %v", tt.description, err) + } + }) + } +} + +func TestCreateVacuumPlan(t *testing.T) { + logic := NewEcVacuumLogic() + + tests := []struct { + name string + volumeID uint32 + collection string + params *worker_pb.TaskParams + expectError bool + validate func(*testing.T, *VacuumPlan) + }{ + { + name: "basic generation 0 to 1 plan", + volumeID: 123, + collection: "test", + params: &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{ + { + Node: "node1:8080", + Generation: 0, + ShardIds: []uint32{0, 1, 2, 3, 4, 5}, + }, + { + Node: "node2:8080", + Generation: 0, + ShardIds: []uint32{6, 7, 8, 9, 10, 11, 12, 13}, + }, + }, + }, + validate: func(t *testing.T, plan *VacuumPlan) { + if plan.VolumeID != 123 { + t.Errorf("volume ID: expected 123, got %d", plan.VolumeID) + } + if plan.Collection != "test" { + t.Errorf("collection: expected 'test', got '%s'", plan.Collection) + } + if plan.CurrentGeneration != 0 { + t.Errorf("current generation: expected 0, got %d", plan.CurrentGeneration) + } + if plan.TargetGeneration != 1 { + t.Errorf("target generation: expected 1, got %d", plan.TargetGeneration) + } + if len(plan.GenerationsToCleanup) != 1 || plan.GenerationsToCleanup[0] != 0 { + t.Errorf("cleanup generations: expected [0], got %v", plan.GenerationsToCleanup) + } + if len(plan.SourceDistribution.Nodes) != 2 { + t.Errorf("source nodes: expected 2, got %d", len(plan.SourceDistribution.Nodes)) + } + if len(plan.ExpectedDistribution.Nodes) != 2 { + t.Errorf("expected nodes: expected 2, got %d", len(plan.ExpectedDistribution.Nodes)) + } + }, + }, + { + name: "generation 3 to 4 plan", + volumeID: 456, + collection: "data", + params: &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{ + { + Node: "node1:8080", + Generation: 3, + ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + { + Node: "node2:8080", + Generation: 3, + ShardIds: []uint32{10, 11, 12, 13}, + }, + }, + }, + validate: func(t *testing.T, plan *VacuumPlan) { + if plan.CurrentGeneration != 3 { + t.Errorf("current generation: expected 3, got %d", plan.CurrentGeneration) + } + if plan.TargetGeneration != 4 { + t.Errorf("target generation: expected 4, got %d", plan.TargetGeneration) + } + if len(plan.GenerationsToCleanup) != 1 || plan.GenerationsToCleanup[0] != 3 { + t.Errorf("cleanup generations: expected [3], got %v", plan.GenerationsToCleanup) + } + }, + }, + { + name: "inconsistent generations", + volumeID: 789, + collection: "test", + params: &worker_pb.TaskParams{ + Sources: []*worker_pb.TaskSource{ + {Generation: 1}, + {Generation: 2}, + }, + }, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plan, err := logic.CreateVacuumPlan(tt.volumeID, tt.collection, tt.params) + + if tt.expectError { + if err == nil { + t.Errorf("expected error but got none") + } + return + } + + if err != nil { + t.Errorf("unexpected error: %v", err) + return + } + + if tt.validate != nil { + tt.validate(t, plan) + } + }) + } +} + +func TestCalculateCleanupGenerations(t *testing.T) { + logic := NewEcVacuumLogic() + + tests := []struct { + name string + currentGen uint32 + targetGen uint32 + availableGenerations []uint32 + expectedCleanup []uint32 + }{ + { + name: "single generation cleanup", + currentGen: 0, + targetGen: 1, + availableGenerations: []uint32{0, 1}, + expectedCleanup: []uint32{0}, // Don't cleanup target generation 1 + }, + { + name: "multiple generations cleanup", + currentGen: 2, + targetGen: 3, + availableGenerations: []uint32{0, 1, 2, 3}, + expectedCleanup: []uint32{0, 1, 2}, // Don't cleanup target generation 3 + }, + { + name: "no cleanup needed", + currentGen: 0, + targetGen: 1, + availableGenerations: []uint32{1}, + expectedCleanup: []uint32{}, // Only target generation exists + }, + { + name: "cleanup all except target", + currentGen: 5, + targetGen: 6, + availableGenerations: []uint32{0, 1, 2, 3, 4, 5, 6}, + expectedCleanup: []uint32{0, 1, 2, 3, 4, 5}, // Don't cleanup target generation 6 + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := logic.CalculateCleanupGenerations(tt.currentGen, tt.targetGen, tt.availableGenerations) + + if len(result) != len(tt.expectedCleanup) { + t.Errorf("cleanup generations length: expected %d, got %d", len(tt.expectedCleanup), len(result)) + return + } + + // Convert to map for easier comparison + expectedMap := make(map[uint32]bool) + for _, gen := range tt.expectedCleanup { + expectedMap[gen] = true + } + + for _, gen := range result { + if !expectedMap[gen] { + t.Errorf("unexpected generation in cleanup: %d", gen) + } + delete(expectedMap, gen) + } + + // Check for missing generations + for gen := range expectedMap { + t.Errorf("missing generation in cleanup: %d", gen) + } + }) + } +} + +func TestEstimateCleanupImpact(t *testing.T) { + logic := NewEcVacuumLogic() + + plan := &VacuumPlan{ + VolumeID: 123, + CurrentGeneration: 2, + TargetGeneration: 3, + SourceDistribution: ShardDistribution{ + Generation: 2, + Nodes: map[pb.ServerAddress]erasure_coding.ShardBits{ + "node1:8080": createShardBits([]int{0, 1, 2, 3, 4}), + "node2:8080": createShardBits([]int{5, 6, 7, 8, 9}), + "node3:8080": createShardBits([]int{10, 11, 12, 13}), + }, + }, + GenerationsToCleanup: []uint32{0, 1, 2}, // 3 generations to cleanup + } + + volumeSize := uint64(1000000) // 1MB + + impact := logic.EstimateCleanupImpact(plan, volumeSize) + + if impact.GenerationsToCleanup != 3 { + t.Errorf("generations to cleanup: expected 3, got %d", impact.GenerationsToCleanup) + } + + if impact.EstimatedSizeFreed != 3000000 { // 3 generations * 1MB each + t.Errorf("estimated size freed: expected 3000000, got %d", impact.EstimatedSizeFreed) + } + + if impact.NodesAffected != 3 { + t.Errorf("nodes affected: expected 3, got %d", impact.NodesAffected) + } + + expectedShardsToDelete := (5 + 5 + 4) * 3 // Total shards per generation * generations + if impact.ShardsToDelete != expectedShardsToDelete { + t.Errorf("shards to delete: expected %d, got %d", expectedShardsToDelete, impact.ShardsToDelete) + } +} + +// Helper function to create ShardBits from shard ID slice +func createShardBits(shardIds []int) erasure_coding.ShardBits { + var bits erasure_coding.ShardBits + for _, id := range shardIds { + bits = bits.AddShardId(erasure_coding.ShardId(id)) + } + return bits +} + +// Test helper to create realistic topology scenarios +func createRealisticTopologyTest(t *testing.T) { + logic := NewEcVacuumLogic() + + // Scenario: 3-node cluster with distributed EC shards + params := &worker_pb.TaskParams{ + VolumeId: 100, + Sources: []*worker_pb.TaskSource{ + { + Node: "volume1:8080", + Generation: 1, + ShardIds: []uint32{0, 1, 2, 3, 4}, + }, + { + Node: "volume2:8080", + Generation: 1, + ShardIds: []uint32{5, 6, 7, 8, 9}, + }, + { + Node: "volume3:8080", + Generation: 1, + ShardIds: []uint32{10, 11, 12, 13}, + }, + }, + } + + plan, err := logic.CreateVacuumPlan(100, "data", params) + if err != nil { + t.Fatalf("failed to create plan: %v", err) + } + + // Validate the plan makes sense + if plan.CurrentGeneration != 1 || plan.TargetGeneration != 2 { + t.Errorf("generation transition: expected 1->2, got %d->%d", + plan.CurrentGeneration, plan.TargetGeneration) + } + + // Validate shard distribution + err = logic.ValidateShardDistribution(plan.SourceDistribution) + if err != nil { + t.Errorf("invalid source distribution: %v", err) + } + + // All source nodes should become destination nodes + if len(plan.SourceDistribution.Nodes) != len(plan.ExpectedDistribution.Nodes) { + t.Errorf("source/destination node count mismatch: %d vs %d", + len(plan.SourceDistribution.Nodes), len(plan.ExpectedDistribution.Nodes)) + } + + t.Logf("Plan created successfully:") + t.Logf(" Volume: %d, Collection: %s", plan.VolumeID, plan.Collection) + t.Logf(" Generation: %d -> %d", plan.CurrentGeneration, plan.TargetGeneration) + t.Logf(" Nodes: %d", len(plan.SourceDistribution.Nodes)) + t.Logf(" Cleanup: %v", plan.GenerationsToCleanup) + t.Logf(" Safety checks: %d", len(plan.SafetyChecks)) +} + +func TestRealisticTopologyScenarios(t *testing.T) { + t.Run("3-node distributed shards", createRealisticTopologyTest) +} diff --git a/weed/worker/tasks/ec_vacuum/ec_vacuum_scenarios_test.go b/weed/worker/tasks/ec_vacuum/ec_vacuum_scenarios_test.go new file mode 100644 index 000000000..fc8b0e1e7 --- /dev/null +++ b/weed/worker/tasks/ec_vacuum/ec_vacuum_scenarios_test.go @@ -0,0 +1,582 @@ +package ec_vacuum + +import ( + "fmt" + "sort" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" +) + +// TestTopologyBasedTaskGeneration tests generating EC vacuum tasks from different active topologies +func TestTopologyBasedTaskGeneration(t *testing.T) { + scenarios := []struct { + name string + topology TopologyScenario + expectTasks int + validate func(*testing.T, []*GeneratedTask) + }{ + { + name: "single_volume_distributed_shards", + topology: TopologyScenario{ + Volumes: []VolumeTopology{ + { + VolumeID: 100, + Collection: "data", + Generation: 0, + ShardDistribution: map[string][]int{ + "node1:8080": {0, 1, 2, 3, 4}, + "node2:8080": {5, 6, 7, 8, 9}, + "node3:8080": {10, 11, 12, 13}, + }, + Size: 1000000, + DeletionRatio: 0.4, + }, + }, + }, + expectTasks: 1, + validate: func(t *testing.T, tasks []*GeneratedTask) { + task := tasks[0] + if task.VolumeID != 100 { + t.Errorf("volume ID: expected 100, got %d", task.VolumeID) + } + if len(task.SourceNodes) != 3 { + t.Errorf("source nodes: expected 3, got %d", len(task.SourceNodes)) + } + + // Verify all shards are accounted for + totalShards := 0 + for _, shards := range task.SourceNodes { + totalShards += len(shards) + } + if totalShards != 14 { + t.Errorf("total shards: expected 14, got %d", totalShards) + } + }, + }, + { + name: "multiple_volumes_different_generations", + topology: TopologyScenario{ + Volumes: []VolumeTopology{ + { + VolumeID: 200, + Generation: 0, + ShardDistribution: map[string][]int{ + "node1:8080": {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + "node2:8080": {10, 11, 12, 13}, + }, + DeletionRatio: 0.6, + }, + { + VolumeID: 201, + Generation: 2, + ShardDistribution: map[string][]int{ + "node1:8080": {0, 1, 2, 3, 4}, + "node2:8080": {5, 6, 7, 8, 9}, + "node3:8080": {10, 11, 12, 13}, + }, + DeletionRatio: 0.5, + }, + }, + }, + expectTasks: 2, + validate: func(t *testing.T, tasks []*GeneratedTask) { + // Sort tasks by volume ID for predictable testing + sort.Slice(tasks, func(i, j int) bool { + return tasks[i].VolumeID < tasks[j].VolumeID + }) + + // Validate volume 200 (generation 0 -> 1) + task0 := tasks[0] + if task0.SourceGeneration != 0 || task0.TargetGeneration != 1 { + t.Errorf("volume 200 generations: expected 0->1, got %d->%d", + task0.SourceGeneration, task0.TargetGeneration) + } + + // Validate volume 201 (generation 2 -> 3) + task1 := tasks[1] + if task1.SourceGeneration != 2 || task1.TargetGeneration != 3 { + t.Errorf("volume 201 generations: expected 2->3, got %d->%d", + task1.SourceGeneration, task1.TargetGeneration) + } + }, + }, + { + name: "unbalanced_shard_distribution", + topology: TopologyScenario{ + Volumes: []VolumeTopology{ + { + VolumeID: 300, + Generation: 1, + ShardDistribution: map[string][]int{ + "node1:8080": {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // 11 shards + "node2:8080": {11, 12, 13}, // 3 shards + }, + DeletionRatio: 0.3, + }, + }, + }, + expectTasks: 1, + validate: func(t *testing.T, tasks []*GeneratedTask) { + task := tasks[0] + + // Verify unbalanced distribution is handled correctly + node1Shards := len(task.SourceNodes["node1:8080"]) + node2Shards := len(task.SourceNodes["node2:8080"]) + + if node1Shards != 11 { + t.Errorf("node1 shards: expected 11, got %d", node1Shards) + } + if node2Shards != 3 { + t.Errorf("node2 shards: expected 3, got %d", node2Shards) + } + + // Total should still be 14 + if node1Shards+node2Shards != 14 { + t.Errorf("total shards: expected 14, got %d", node1Shards+node2Shards) + } + }, + }, + { + name: "insufficient_shards_for_reconstruction", + topology: TopologyScenario{ + Volumes: []VolumeTopology{ + { + VolumeID: 400, + Generation: 0, + ShardDistribution: map[string][]int{ + "node1:8080": {0, 1, 2}, // Only 6 shards total < 10 required + "node2:8080": {3, 4, 5}, + }, + DeletionRatio: 0.8, + }, + }, + }, + expectTasks: 0, // Should not generate task due to insufficient shards + }, + } + + generator := NewTopologyTaskGenerator() + + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + tasks, err := generator.GenerateEcVacuumTasks(scenario.topology) + if err != nil { + t.Fatalf("failed to generate tasks: %v", err) + } + + if len(tasks) != scenario.expectTasks { + t.Errorf("task count: expected %d, got %d", scenario.expectTasks, len(tasks)) + return + } + + if scenario.validate != nil { + scenario.validate(t, tasks) + } + }) + } +} + +// TestShardSelectionAndDeletion tests what shards are actually selected and deleted +func TestShardSelectionAndDeletion(t *testing.T) { + scenarios := []struct { + name string + initialState MultiGenerationState + expectedPlan ExpectedDeletionPlan + }{ + { + name: "single_generation_cleanup", + initialState: MultiGenerationState{ + VolumeID: 500, + Collection: "test", + Generations: map[uint32]GenerationData{ + 0: { + ShardDistribution: map[string][]int{ + "node1:8080": {0, 1, 2, 3, 4, 5}, + "node2:8080": {6, 7, 8, 9, 10, 11, 12, 13}, + }, + FilesOnDisk: []string{ + "test_500.ec00", "test_500.ec01", "test_500.ec02", "test_500.ec03", "test_500.ec04", "test_500.ec05", + "test_500.ec06", "test_500.ec07", "test_500.ec08", "test_500.ec09", "test_500.ec10", "test_500.ec11", "test_500.ec12", "test_500.ec13", + "test_500.ecx", "test_500.ecj", "test_500.vif", + }, + }, + }, + ActiveGeneration: 0, + }, + expectedPlan: ExpectedDeletionPlan{ + SourceGeneration: 0, + TargetGeneration: 1, + GenerationsToDelete: []uint32{0}, + ShardsToDeleteByNode: map[string][]int{ + "node1:8080": {0, 1, 2, 3, 4, 5}, + "node2:8080": {6, 7, 8, 9, 10, 11, 12, 13}, + }, + FilesToDeleteByNode: map[string][]string{ + "node1:8080": { + "test_500.ec00", "test_500.ec01", "test_500.ec02", "test_500.ec03", "test_500.ec04", "test_500.ec05", + "test_500.ecx", "test_500.ecj", "test_500.vif", + }, + "node2:8080": { + "test_500.ec06", "test_500.ec07", "test_500.ec08", "test_500.ec09", "test_500.ec10", "test_500.ec11", "test_500.ec12", "test_500.ec13", + }, + }, + ExpectedFilesAfterCleanup: []string{ + // New generation 1 files + "test_500_g1.ec00", "test_500_g1.ec01", "test_500_g1.ec02", "test_500_g1.ec03", "test_500_g1.ec04", "test_500_g1.ec05", + "test_500_g1.ec06", "test_500_g1.ec07", "test_500_g1.ec08", "test_500_g1.ec09", "test_500_g1.ec10", "test_500_g1.ec11", "test_500_g1.ec12", "test_500_g1.ec13", + "test_500_g1.ecx", "test_500_g1.ecj", "test_500_g1.vif", + }, + }, + }, + { + name: "multi_generation_cleanup", + initialState: MultiGenerationState{ + VolumeID: 600, + Collection: "data", + Generations: map[uint32]GenerationData{ + 0: { + ShardDistribution: map[string][]int{ + "node1:8080": {0, 1, 2, 3, 4}, + "node2:8080": {5, 6, 7, 8, 9}, + "node3:8080": {10, 11, 12, 13}, + }, + FilesOnDisk: []string{ + "data_600.ec00", "data_600.ec01", "data_600.ec02", "data_600.ec03", "data_600.ec04", + "data_600.ec05", "data_600.ec06", "data_600.ec07", "data_600.ec08", "data_600.ec09", + "data_600.ec10", "data_600.ec11", "data_600.ec12", "data_600.ec13", + "data_600.ecx", "data_600.ecj", "data_600.vif", + }, + }, + 1: { + ShardDistribution: map[string][]int{ + "node1:8080": {0, 1, 2, 3, 4}, + "node2:8080": {5, 6, 7, 8, 9}, + "node3:8080": {10, 11, 12, 13}, + }, + FilesOnDisk: []string{ + "data_600_g1.ec00", "data_600_g1.ec01", "data_600_g1.ec02", "data_600_g1.ec03", "data_600_g1.ec04", + "data_600_g1.ec05", "data_600_g1.ec06", "data_600_g1.ec07", "data_600_g1.ec08", "data_600_g1.ec09", + "data_600_g1.ec10", "data_600_g1.ec11", "data_600_g1.ec12", "data_600_g1.ec13", + "data_600_g1.ecx", "data_600_g1.ecj", "data_600_g1.vif", + }, + }, + 2: { + ShardDistribution: map[string][]int{ + "node1:8080": {0, 1, 2, 3, 4}, + "node2:8080": {5, 6, 7, 8, 9}, + "node3:8080": {10, 11, 12, 13}, + }, + FilesOnDisk: []string{ + "data_600_g2.ec00", "data_600_g2.ec01", "data_600_g2.ec02", "data_600_g2.ec03", "data_600_g2.ec04", + "data_600_g2.ec05", "data_600_g2.ec06", "data_600_g2.ec07", "data_600_g2.ec08", "data_600_g2.ec09", + "data_600_g2.ec10", "data_600_g2.ec11", "data_600_g2.ec12", "data_600_g2.ec13", + "data_600_g2.ecx", "data_600_g2.ecj", "data_600_g2.vif", + }, + }, + }, + ActiveGeneration: 2, + }, + expectedPlan: ExpectedDeletionPlan{ + SourceGeneration: 2, + TargetGeneration: 3, + GenerationsToDelete: []uint32{2}, // Only current generation (0 and 1 should have been cleaned up in previous runs) + ShardsToDeleteByNode: map[string][]int{ + "node1:8080": {0, 1, 2, 3, 4}, + "node2:8080": {5, 6, 7, 8, 9}, + "node3:8080": {10, 11, 12, 13}, + }, + FilesToDeleteByNode: map[string][]string{ + "node1:8080": { + "data_600_g2.ec00", "data_600_g2.ec01", "data_600_g2.ec02", "data_600_g2.ec03", "data_600_g2.ec04", + "data_600_g2.ecx", "data_600_g2.ecj", "data_600_g2.vif", + }, + "node2:8080": { + "data_600_g2.ec05", "data_600_g2.ec06", "data_600_g2.ec07", "data_600_g2.ec08", "data_600_g2.ec09", + }, + "node3:8080": { + "data_600_g2.ec10", "data_600_g2.ec11", "data_600_g2.ec12", "data_600_g2.ec13", + }, + }, + ExpectedFilesAfterCleanup: []string{ + // Old generations should remain (they should have been cleaned up before) + "data_600.ec00", "data_600.ec01", "data_600.ec02", "data_600.ec03", "data_600.ec04", + "data_600.ec05", "data_600.ec06", "data_600.ec07", "data_600.ec08", "data_600.ec09", + "data_600.ec10", "data_600.ec11", "data_600.ec12", "data_600.ec13", + "data_600.ecx", "data_600.ecj", "data_600.vif", + "data_600_g1.ec00", "data_600_g1.ec01", "data_600_g1.ec02", "data_600_g1.ec03", "data_600_g1.ec04", + "data_600_g1.ec05", "data_600_g1.ec06", "data_600_g1.ec07", "data_600_g1.ec08", "data_600_g1.ec09", + "data_600_g1.ec10", "data_600_g1.ec11", "data_600_g1.ec12", "data_600_g1.ec13", + "data_600_g1.ecx", "data_600_g1.ecj", "data_600_g1.vif", + // New generation 3 files + "data_600_g3.ec00", "data_600_g3.ec01", "data_600_g3.ec02", "data_600_g3.ec03", "data_600_g3.ec04", + "data_600_g3.ec05", "data_600_g3.ec06", "data_600_g3.ec07", "data_600_g3.ec08", "data_600_g3.ec09", + "data_600_g3.ec10", "data_600_g3.ec11", "data_600_g3.ec12", "data_600_g3.ec13", + "data_600_g3.ecx", "data_600_g3.ecj", "data_600_g3.vif", + }, + }, + }, + } + + logic := NewEcVacuumLogic() + + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + // Convert multi-generation state to task parameters + params := convertMultiGenerationStateToParams(scenario.initialState) + + // Create vacuum plan + plan, err := logic.CreateVacuumPlan(scenario.initialState.VolumeID, scenario.initialState.Collection, params) + if err != nil { + t.Fatalf("failed to create plan: %v", err) + } + + // Validate generation transitions + if plan.CurrentGeneration != scenario.expectedPlan.SourceGeneration { + t.Errorf("source generation: expected %d, got %d", + scenario.expectedPlan.SourceGeneration, plan.CurrentGeneration) + } + + if plan.TargetGeneration != scenario.expectedPlan.TargetGeneration { + t.Errorf("target generation: expected %d, got %d", + scenario.expectedPlan.TargetGeneration, plan.TargetGeneration) + } + + // Validate cleanup generations + if !equalUint32Slices(plan.GenerationsToCleanup, scenario.expectedPlan.GenerationsToDelete) { + t.Errorf("cleanup generations: expected %v, got %v", + scenario.expectedPlan.GenerationsToDelete, plan.GenerationsToCleanup) + } + + // Validate shard distribution + for nodeAddr, expectedShards := range scenario.expectedPlan.ShardsToDeleteByNode { + shardBits, exists := plan.SourceDistribution.Nodes[pb.ServerAddress(nodeAddr)] + if !exists { + t.Errorf("expected node %s not found in plan", nodeAddr) + continue + } + + actualShards := shardBitsToSlice(shardBits) + if !equalIntSlices(actualShards, expectedShards) { + t.Errorf("node %s shards: expected %v, got %v", nodeAddr, expectedShards, actualShards) + } + } + + t.Logf("Plan validation successful:") + t.Logf(" Volume: %d (%s)", plan.VolumeID, plan.Collection) + t.Logf(" Generation transition: %d -> %d", plan.CurrentGeneration, plan.TargetGeneration) + t.Logf(" Cleanup generations: %v", plan.GenerationsToCleanup) + t.Logf(" Nodes affected: %d", len(plan.SourceDistribution.Nodes)) + + // Estimate cleanup impact + impact := logic.EstimateCleanupImpact(plan, 1000000) // 1MB volume + t.Logf(" Estimated impact: %d shards deleted, %d bytes freed", + impact.ShardsToDelete, impact.EstimatedSizeFreed) + }) + } +} + +// Test data structures for comprehensive testing +type VolumeTopology struct { + VolumeID uint32 + Collection string + Generation uint32 + ShardDistribution map[string][]int // node -> shard IDs + Size uint64 + DeletionRatio float64 +} + +type TopologyScenario struct { + Volumes []VolumeTopology +} + +type GenerationData struct { + ShardDistribution map[string][]int // node -> shard IDs + FilesOnDisk []string +} + +type MultiGenerationState struct { + VolumeID uint32 + Collection string + Generations map[uint32]GenerationData + ActiveGeneration uint32 +} + +type ExpectedDeletionPlan struct { + SourceGeneration uint32 + TargetGeneration uint32 + GenerationsToDelete []uint32 + ShardsToDeleteByNode map[string][]int + FilesToDeleteByNode map[string][]string + ExpectedFilesAfterCleanup []string +} + +type GeneratedTask struct { + VolumeID uint32 + Collection string + SourceGeneration uint32 + TargetGeneration uint32 + SourceNodes map[string][]int // node -> shard IDs +} + +type TopologyTaskGenerator struct { + logic *EcVacuumLogic +} + +func NewTopologyTaskGenerator() *TopologyTaskGenerator { + return &TopologyTaskGenerator{ + logic: NewEcVacuumLogic(), + } +} + +func (g *TopologyTaskGenerator) GenerateEcVacuumTasks(scenario TopologyScenario) ([]*GeneratedTask, error) { + var tasks []*GeneratedTask + + for _, volume := range scenario.Volumes { + // Check if volume qualifies for vacuum (sufficient shards + deletion ratio) + if !g.qualifiesForVacuum(volume) { + continue + } + + // Convert to task parameters + params := g.volumeTopologyToParams(volume) + + // Create plan using logic + plan, err := g.logic.CreateVacuumPlan(volume.VolumeID, volume.Collection, params) + if err != nil { + return nil, fmt.Errorf("failed to create plan for volume %d: %w", volume.VolumeID, err) + } + + // Convert plan to generated task + task := &GeneratedTask{ + VolumeID: plan.VolumeID, + Collection: plan.Collection, + SourceGeneration: plan.CurrentGeneration, + TargetGeneration: plan.TargetGeneration, + SourceNodes: make(map[string][]int), + } + + // Convert shard distribution + for node, shardBits := range plan.SourceDistribution.Nodes { + task.SourceNodes[string(node)] = shardBitsToSlice(shardBits) + } + + tasks = append(tasks, task) + } + + return tasks, nil +} + +func (g *TopologyTaskGenerator) qualifiesForVacuum(volume VolumeTopology) bool { + // Check deletion ratio threshold (minimum 0.3) + if volume.DeletionRatio < 0.3 { + return false + } + + // Check sufficient shards for reconstruction + totalShards := 0 + for _, shards := range volume.ShardDistribution { + totalShards += len(shards) + } + + return totalShards >= erasure_coding.DataShardsCount +} + +func (g *TopologyTaskGenerator) volumeTopologyToParams(volume VolumeTopology) *worker_pb.TaskParams { + var sources []*worker_pb.TaskSource + + for node, shardIds := range volume.ShardDistribution { + shardIds32 := make([]uint32, len(shardIds)) + for i, id := range shardIds { + shardIds32[i] = uint32(id) + } + + sources = append(sources, &worker_pb.TaskSource{ + Node: node, + VolumeId: volume.VolumeID, + ShardIds: shardIds32, + Generation: volume.Generation, + }) + } + + return &worker_pb.TaskParams{ + VolumeId: volume.VolumeID, + Sources: sources, + } +} + +// Helper functions +func convertMultiGenerationStateToParams(state MultiGenerationState) *worker_pb.TaskParams { + // Use active generation as source + activeData := state.Generations[state.ActiveGeneration] + + var sources []*worker_pb.TaskSource + for node, shardIds := range activeData.ShardDistribution { + shardIds32 := make([]uint32, len(shardIds)) + for i, id := range shardIds { + shardIds32[i] = uint32(id) + } + + sources = append(sources, &worker_pb.TaskSource{ + Node: node, + VolumeId: state.VolumeID, + ShardIds: shardIds32, + Generation: state.ActiveGeneration, + }) + } + + return &worker_pb.TaskParams{ + VolumeId: state.VolumeID, + Sources: sources, + } +} + +func shardBitsToSlice(bits erasure_coding.ShardBits) []int { + var shards []int + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + if bits.HasShardId(erasure_coding.ShardId(i)) { + shards = append(shards, i) + } + } + return shards +} + +func equalUint32Slices(a, b []uint32) bool { + if len(a) != len(b) { + return false + } + sortedA := make([]uint32, len(a)) + sortedB := make([]uint32, len(b)) + copy(sortedA, a) + copy(sortedB, b) + sort.Slice(sortedA, func(i, j int) bool { return sortedA[i] < sortedA[j] }) + sort.Slice(sortedB, func(i, j int) bool { return sortedB[i] < sortedB[j] }) + + for i := range sortedA { + if sortedA[i] != sortedB[i] { + return false + } + } + return true +} + +func equalIntSlices(a, b []int) bool { + if len(a) != len(b) { + return false + } + sortedA := make([]int, len(a)) + sortedB := make([]int, len(b)) + copy(sortedA, a) + copy(sortedB, b) + sort.Ints(sortedA) + sort.Ints(sortedB) + + for i := range sortedA { + if sortedA[i] != sortedB[i] { + return false + } + } + return true +} diff --git a/weed/worker/tasks/ec_vacuum/safety_checks_test.go b/weed/worker/tasks/ec_vacuum/safety_checks_test.go index 188255de9..fa80c6ff2 100644 --- a/weed/worker/tasks/ec_vacuum/safety_checks_test.go +++ b/weed/worker/tasks/ec_vacuum/safety_checks_test.go @@ -293,7 +293,7 @@ func TestSafetyCheckNoActiveOperations(t *testing.T) { task := createSafetyTestTask() // Verify grace period is reasonable - assert.Equal(t, 5*time.Minute, task.cleanupGracePeriod, "Grace period should be 5 minutes") + assert.Equal(t, 1*time.Minute, task.cleanupGracePeriod, "Grace period should be 1 minute") // Test that grace period logic passes // In a real scenario, this would check for active operations