Browse Source

adding EcVacuumLogic

add-ec-vacuum
chrislu 4 months ago
parent
commit
425a54019b
  1. 4
      weed/worker/tasks/ec_vacuum/ec_vacuum_generation_unit_test.go
  2. 253
      weed/worker/tasks/ec_vacuum/ec_vacuum_logic.go
  3. 687
      weed/worker/tasks/ec_vacuum/ec_vacuum_logic_test.go
  4. 582
      weed/worker/tasks/ec_vacuum/ec_vacuum_scenarios_test.go
  5. 2
      weed/worker/tasks/ec_vacuum/safety_checks_test.go

4
weed/worker/tasks/ec_vacuum/ec_vacuum_generation_unit_test.go

@ -237,6 +237,10 @@ func TestEcVacuumActivateNewGeneration(t *testing.T) {
task := NewEcVacuumTask("activate-test", volumeId, collection, sourceNodes) task := NewEcVacuumTask("activate-test", volumeId, collection, sourceNodes)
// Set generations manually for this test (normally done by Execute via task parameters)
task.sourceGeneration = 0
task.targetGeneration = 1
// Simulate the activation step // Simulate the activation step
ctx := context.Background() ctx := context.Background()

253
weed/worker/tasks/ec_vacuum/ec_vacuum_logic.go

@ -0,0 +1,253 @@
package ec_vacuum
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
)
// EcVacuumLogic contains the core business logic for EC vacuum operations
// This is extracted from EcVacuumTask to make it easily testable
type EcVacuumLogic struct{}
// NewEcVacuumLogic creates a new instance of the core logic
func NewEcVacuumLogic() *EcVacuumLogic {
return &EcVacuumLogic{}
}
// GenerationPlan represents a plan for generation transitions during vacuum
type GenerationPlan struct {
VolumeID uint32
SourceGeneration uint32
TargetGeneration uint32
SourceNodes map[pb.ServerAddress]erasure_coding.ShardBits
CleanupPlan []uint32 // Generations to be cleaned up
}
// ShardDistribution represents how shards are distributed across nodes
type ShardDistribution struct {
Generation uint32
Nodes map[pb.ServerAddress]erasure_coding.ShardBits
}
// VacuumPlan represents the complete plan for an EC vacuum operation
type VacuumPlan struct {
VolumeID uint32
Collection string
CurrentGeneration uint32
TargetGeneration uint32
SourceDistribution ShardDistribution
ExpectedDistribution ShardDistribution
GenerationsToCleanup []uint32
SafetyChecks []string
}
// DetermineGenerationsFromParams extracts generation information from task parameters
func (logic *EcVacuumLogic) DetermineGenerationsFromParams(params *worker_pb.TaskParams) (sourceGen, targetGen uint32, err error) {
if params == nil {
return 0, 0, fmt.Errorf("task parameters cannot be nil")
}
if len(params.Sources) == 0 {
// Fallback to safe defaults for backward compatibility
return 0, 1, nil
}
// Use generation from first source (all sources should have same generation)
if params.Sources[0].Generation > 0 {
sourceGen = params.Sources[0].Generation
targetGen = sourceGen + 1
} else {
// Generation 0 case
sourceGen = 0
targetGen = 1
}
// Validate consistency - all sources should have the same generation
for i, source := range params.Sources {
if source.Generation != sourceGen {
return 0, 0, fmt.Errorf("inconsistent generations in sources: source[0]=%d, source[%d]=%d",
sourceGen, i, source.Generation)
}
}
return sourceGen, targetGen, nil
}
// ParseSourceNodes extracts source node information from task parameters
func (logic *EcVacuumLogic) ParseSourceNodes(params *worker_pb.TaskParams) (map[pb.ServerAddress]erasure_coding.ShardBits, error) {
if params == nil {
return nil, fmt.Errorf("task parameters cannot be nil")
}
sourceNodes := make(map[pb.ServerAddress]erasure_coding.ShardBits)
for _, source := range params.Sources {
if source.Node == "" {
continue
}
serverAddr := pb.ServerAddress(source.Node)
var shardBits erasure_coding.ShardBits
// Convert shard IDs to ShardBits
for _, shardId := range source.ShardIds {
if shardId < erasure_coding.TotalShardsCount {
shardBits = shardBits.AddShardId(erasure_coding.ShardId(shardId))
}
}
if shardBits.ShardIdCount() > 0 {
sourceNodes[serverAddr] = shardBits
}
}
if len(sourceNodes) == 0 {
return nil, fmt.Errorf("no valid source nodes found: sources=%d", len(params.Sources))
}
return sourceNodes, nil
}
// CreateVacuumPlan creates a comprehensive plan for the EC vacuum operation
func (logic *EcVacuumLogic) CreateVacuumPlan(volumeID uint32, collection string, params *worker_pb.TaskParams) (*VacuumPlan, error) {
// Extract generations
sourceGen, targetGen, err := logic.DetermineGenerationsFromParams(params)
if err != nil {
return nil, fmt.Errorf("failed to determine generations: %w", err)
}
// Parse source nodes
sourceNodes, err := logic.ParseSourceNodes(params)
if err != nil {
return nil, fmt.Errorf("failed to parse source nodes: %w", err)
}
// Create source distribution
sourceDistribution := ShardDistribution{
Generation: sourceGen,
Nodes: sourceNodes,
}
// Expected distribution is same nodes but with target generation
expectedDistribution := ShardDistribution{
Generation: targetGen,
Nodes: sourceNodes, // Same nodes, new generation
}
// Determine what to cleanup (simplified: just source generation)
generationsToCleanup := []uint32{sourceGen}
// Generate safety checks
safetyChecks := logic.generateSafetyChecks(sourceDistribution, targetGen)
return &VacuumPlan{
VolumeID: volumeID,
Collection: collection,
CurrentGeneration: sourceGen,
TargetGeneration: targetGen,
SourceDistribution: sourceDistribution,
ExpectedDistribution: expectedDistribution,
GenerationsToCleanup: generationsToCleanup,
SafetyChecks: safetyChecks,
}, nil
}
// ValidateShardDistribution validates that the shard distribution is sufficient for vacuum
func (logic *EcVacuumLogic) ValidateShardDistribution(distribution ShardDistribution) error {
totalShards := erasure_coding.ShardBits(0)
for _, shardBits := range distribution.Nodes {
totalShards = totalShards.Plus(shardBits)
}
shardCount := totalShards.ShardIdCount()
if shardCount < erasure_coding.DataShardsCount {
return fmt.Errorf("insufficient shards for reconstruction: have %d, need at least %d",
shardCount, erasure_coding.DataShardsCount)
}
return nil
}
// CalculateCleanupGenerations determines which generations should be cleaned up
func (logic *EcVacuumLogic) CalculateCleanupGenerations(currentGen, targetGen uint32, availableGenerations []uint32) []uint32 {
var toCleanup []uint32
for _, gen := range availableGenerations {
// Don't clean up the target generation
if gen != targetGen {
toCleanup = append(toCleanup, gen)
}
}
return toCleanup
}
// generateSafetyChecks creates a list of safety checks for the vacuum plan
func (logic *EcVacuumLogic) generateSafetyChecks(distribution ShardDistribution, targetGen uint32) []string {
var checks []string
// Check 1: Sufficient shards
totalShards := erasure_coding.ShardBits(0)
for _, shardBits := range distribution.Nodes {
totalShards = totalShards.Plus(shardBits)
}
checks = append(checks, fmt.Sprintf("Total shards available: %d/%d",
totalShards.ShardIdCount(), erasure_coding.TotalShardsCount))
// Check 2: Minimum data shards
if totalShards.ShardIdCount() >= erasure_coding.DataShardsCount {
checks = append(checks, "✅ Sufficient data shards for reconstruction")
} else {
checks = append(checks, "❌ INSUFFICIENT data shards for reconstruction")
}
// Check 3: Node distribution
checks = append(checks, fmt.Sprintf("Shard distribution across %d nodes", len(distribution.Nodes)))
// Check 4: Generation safety
checks = append(checks, fmt.Sprintf("Target generation %d != source generation %d",
targetGen, distribution.Generation))
return checks
}
// EstimateCleanupImpact estimates the storage impact of cleanup operations
func (logic *EcVacuumLogic) EstimateCleanupImpact(plan *VacuumPlan, volumeSize uint64) CleanupImpact {
// Estimate size per generation
sizePerGeneration := volumeSize
// Calculate total cleanup impact
var totalCleanupSize uint64
for range plan.GenerationsToCleanup {
totalCleanupSize += sizePerGeneration
}
return CleanupImpact{
GenerationsToCleanup: len(plan.GenerationsToCleanup),
EstimatedSizeFreed: totalCleanupSize,
NodesAffected: len(plan.SourceDistribution.Nodes),
ShardsToDelete: logic.countShardsToDelete(plan),
}
}
// CleanupImpact represents the estimated impact of cleanup operations
type CleanupImpact struct {
GenerationsToCleanup int
EstimatedSizeFreed uint64
NodesAffected int
ShardsToDelete int
}
// countShardsToDelete counts how many shard files will be deleted
func (logic *EcVacuumLogic) countShardsToDelete(plan *VacuumPlan) int {
totalShards := 0
for _, shardBits := range plan.SourceDistribution.Nodes {
totalShards += shardBits.ShardIdCount()
}
return totalShards * len(plan.GenerationsToCleanup)
}

687
weed/worker/tasks/ec_vacuum/ec_vacuum_logic_test.go

@ -0,0 +1,687 @@
package ec_vacuum
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
)
func TestDetermineGenerationsFromParams(t *testing.T) {
logic := NewEcVacuumLogic()
tests := []struct {
name string
params *worker_pb.TaskParams
expectSrc uint32
expectTgt uint32
expectError bool
}{
{
name: "nil params",
params: nil,
expectError: true,
},
{
name: "empty sources - fallback to defaults",
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{},
},
expectSrc: 0,
expectTgt: 1,
},
{
name: "generation 0 source",
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
{Generation: 0},
},
},
expectSrc: 0,
expectTgt: 1,
},
{
name: "generation 1 source",
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
{Generation: 1},
},
},
expectSrc: 1,
expectTgt: 2,
},
{
name: "generation 5 source",
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
{Generation: 5},
},
},
expectSrc: 5,
expectTgt: 6,
},
{
name: "inconsistent generations",
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
{Generation: 1},
{Generation: 2}, // Different generation!
},
},
expectError: true,
},
{
name: "multiple sources same generation",
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
{Generation: 3},
{Generation: 3},
{Generation: 3},
},
},
expectSrc: 3,
expectTgt: 4,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srcGen, tgtGen, err := logic.DetermineGenerationsFromParams(tt.params)
if tt.expectError {
if err == nil {
t.Errorf("expected error but got none")
}
return
}
if err != nil {
t.Errorf("unexpected error: %v", err)
return
}
if srcGen != tt.expectSrc {
t.Errorf("source generation: expected %d, got %d", tt.expectSrc, srcGen)
}
if tgtGen != tt.expectTgt {
t.Errorf("target generation: expected %d, got %d", tt.expectTgt, tgtGen)
}
})
}
}
func TestParseSourceNodes(t *testing.T) {
logic := NewEcVacuumLogic()
tests := []struct {
name string
params *worker_pb.TaskParams
expectNodes int
expectShards map[string][]int // node -> shard IDs
expectError bool
}{
{
name: "nil params",
params: nil,
expectError: true,
},
{
name: "empty sources",
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{},
},
expectError: true,
},
{
name: "single node with shards",
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
{
Node: "node1:8080",
ShardIds: []uint32{0, 1, 2, 3, 4, 5},
},
},
},
expectNodes: 1,
expectShards: map[string][]int{
"node1:8080": {0, 1, 2, 3, 4, 5},
},
},
{
name: "multiple nodes with different shards",
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
{
Node: "node1:8080",
ShardIds: []uint32{0, 1, 2, 3, 4},
},
{
Node: "node2:8080",
ShardIds: []uint32{5, 6, 7, 8, 9},
},
{
Node: "node3:8080",
ShardIds: []uint32{10, 11, 12, 13},
},
},
},
expectNodes: 3,
expectShards: map[string][]int{
"node1:8080": {0, 1, 2, 3, 4},
"node2:8080": {5, 6, 7, 8, 9},
"node3:8080": {10, 11, 12, 13},
},
},
{
name: "overlapping shards across nodes",
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
{
Node: "node1:8080",
ShardIds: []uint32{0, 1, 2},
},
{
Node: "node2:8080",
ShardIds: []uint32{0, 3, 4}, // Shard 0 is on both nodes
},
},
},
expectNodes: 2,
expectShards: map[string][]int{
"node1:8080": {0, 1, 2},
"node2:8080": {0, 3, 4},
},
},
{
name: "empty node name ignored",
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
{
Node: "",
ShardIds: []uint32{0, 1, 2},
},
{
Node: "node1:8080",
ShardIds: []uint32{3, 4, 5},
},
},
},
expectNodes: 1,
expectShards: map[string][]int{
"node1:8080": {3, 4, 5},
},
},
{
name: "invalid shard IDs filtered out",
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
{
Node: "node1:8080",
ShardIds: []uint32{0, 1, 14, 15, 100}, // 14+ are invalid
},
},
},
expectNodes: 1,
expectShards: map[string][]int{
"node1:8080": {0, 1}, // Only valid shards
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sourceNodes, err := logic.ParseSourceNodes(tt.params)
if tt.expectError {
if err == nil {
t.Errorf("expected error but got none")
}
return
}
if err != nil {
t.Errorf("unexpected error: %v", err)
return
}
if len(sourceNodes) != tt.expectNodes {
t.Errorf("node count: expected %d, got %d", tt.expectNodes, len(sourceNodes))
return
}
// Verify shard distribution
for nodeAddr, expectedShardIds := range tt.expectShards {
shardBits, exists := sourceNodes[pb.ServerAddress(nodeAddr)]
if !exists {
t.Errorf("expected node %s not found", nodeAddr)
continue
}
// Convert ShardBits back to slice for comparison
var actualShardIds []int
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
if shardBits.HasShardId(erasure_coding.ShardId(i)) {
actualShardIds = append(actualShardIds, i)
}
}
if len(actualShardIds) != len(expectedShardIds) {
t.Errorf("node %s shard count: expected %d, got %d",
nodeAddr, len(expectedShardIds), len(actualShardIds))
continue
}
// Check each expected shard
for _, expectedId := range expectedShardIds {
found := false
for _, actualId := range actualShardIds {
if actualId == expectedId {
found = true
break
}
}
if !found {
t.Errorf("node %s missing expected shard %d", nodeAddr, expectedId)
}
}
}
})
}
}
func TestValidateShardDistribution(t *testing.T) {
logic := NewEcVacuumLogic()
tests := []struct {
name string
distribution ShardDistribution
expectError bool
description string
}{
{
name: "sufficient shards for reconstruction",
distribution: ShardDistribution{
Generation: 1,
Nodes: map[pb.ServerAddress]erasure_coding.ShardBits{
"node1:8080": createShardBits([]int{0, 1, 2, 3, 4}),
"node2:8080": createShardBits([]int{5, 6, 7, 8, 9}),
},
},
expectError: false,
description: "10 shards >= 10 data shards required",
},
{
name: "exactly minimum data shards",
distribution: ShardDistribution{
Generation: 1,
Nodes: map[pb.ServerAddress]erasure_coding.ShardBits{
"node1:8080": createShardBits([]int{0, 1, 2, 3, 4}),
"node2:8080": createShardBits([]int{5, 6, 7, 8, 9}),
},
},
expectError: false,
description: "Exactly 10 data shards",
},
{
name: "insufficient shards",
distribution: ShardDistribution{
Generation: 1,
Nodes: map[pb.ServerAddress]erasure_coding.ShardBits{
"node1:8080": createShardBits([]int{0, 1, 2}),
"node2:8080": createShardBits([]int{3, 4, 5}),
},
},
expectError: true,
description: "Only 6 shards < 10 data shards required",
},
{
name: "all shards available",
distribution: ShardDistribution{
Generation: 1,
Nodes: map[pb.ServerAddress]erasure_coding.ShardBits{
"node1:8080": createShardBits([]int{0, 1, 2, 3, 4}),
"node2:8080": createShardBits([]int{5, 6, 7, 8, 9}),
"node3:8080": createShardBits([]int{10, 11, 12, 13}),
},
},
expectError: false,
description: "All 14 shards available",
},
{
name: "single node with all shards",
distribution: ShardDistribution{
Generation: 1,
Nodes: map[pb.ServerAddress]erasure_coding.ShardBits{
"node1:8080": createShardBits([]int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}),
},
},
expectError: false,
description: "All shards on single node",
},
{
name: "empty distribution",
distribution: ShardDistribution{
Generation: 1,
Nodes: map[pb.ServerAddress]erasure_coding.ShardBits{},
},
expectError: true,
description: "No shards available",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := logic.ValidateShardDistribution(tt.distribution)
if tt.expectError && err == nil {
t.Errorf("expected error for %s but got none", tt.description)
}
if !tt.expectError && err != nil {
t.Errorf("unexpected error for %s: %v", tt.description, err)
}
})
}
}
func TestCreateVacuumPlan(t *testing.T) {
logic := NewEcVacuumLogic()
tests := []struct {
name string
volumeID uint32
collection string
params *worker_pb.TaskParams
expectError bool
validate func(*testing.T, *VacuumPlan)
}{
{
name: "basic generation 0 to 1 plan",
volumeID: 123,
collection: "test",
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
{
Node: "node1:8080",
Generation: 0,
ShardIds: []uint32{0, 1, 2, 3, 4, 5},
},
{
Node: "node2:8080",
Generation: 0,
ShardIds: []uint32{6, 7, 8, 9, 10, 11, 12, 13},
},
},
},
validate: func(t *testing.T, plan *VacuumPlan) {
if plan.VolumeID != 123 {
t.Errorf("volume ID: expected 123, got %d", plan.VolumeID)
}
if plan.Collection != "test" {
t.Errorf("collection: expected 'test', got '%s'", plan.Collection)
}
if plan.CurrentGeneration != 0 {
t.Errorf("current generation: expected 0, got %d", plan.CurrentGeneration)
}
if plan.TargetGeneration != 1 {
t.Errorf("target generation: expected 1, got %d", plan.TargetGeneration)
}
if len(plan.GenerationsToCleanup) != 1 || plan.GenerationsToCleanup[0] != 0 {
t.Errorf("cleanup generations: expected [0], got %v", plan.GenerationsToCleanup)
}
if len(plan.SourceDistribution.Nodes) != 2 {
t.Errorf("source nodes: expected 2, got %d", len(plan.SourceDistribution.Nodes))
}
if len(plan.ExpectedDistribution.Nodes) != 2 {
t.Errorf("expected nodes: expected 2, got %d", len(plan.ExpectedDistribution.Nodes))
}
},
},
{
name: "generation 3 to 4 plan",
volumeID: 456,
collection: "data",
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
{
Node: "node1:8080",
Generation: 3,
ShardIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
},
{
Node: "node2:8080",
Generation: 3,
ShardIds: []uint32{10, 11, 12, 13},
},
},
},
validate: func(t *testing.T, plan *VacuumPlan) {
if plan.CurrentGeneration != 3 {
t.Errorf("current generation: expected 3, got %d", plan.CurrentGeneration)
}
if plan.TargetGeneration != 4 {
t.Errorf("target generation: expected 4, got %d", plan.TargetGeneration)
}
if len(plan.GenerationsToCleanup) != 1 || plan.GenerationsToCleanup[0] != 3 {
t.Errorf("cleanup generations: expected [3], got %v", plan.GenerationsToCleanup)
}
},
},
{
name: "inconsistent generations",
volumeID: 789,
collection: "test",
params: &worker_pb.TaskParams{
Sources: []*worker_pb.TaskSource{
{Generation: 1},
{Generation: 2},
},
},
expectError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
plan, err := logic.CreateVacuumPlan(tt.volumeID, tt.collection, tt.params)
if tt.expectError {
if err == nil {
t.Errorf("expected error but got none")
}
return
}
if err != nil {
t.Errorf("unexpected error: %v", err)
return
}
if tt.validate != nil {
tt.validate(t, plan)
}
})
}
}
func TestCalculateCleanupGenerations(t *testing.T) {
logic := NewEcVacuumLogic()
tests := []struct {
name string
currentGen uint32
targetGen uint32
availableGenerations []uint32
expectedCleanup []uint32
}{
{
name: "single generation cleanup",
currentGen: 0,
targetGen: 1,
availableGenerations: []uint32{0, 1},
expectedCleanup: []uint32{0}, // Don't cleanup target generation 1
},
{
name: "multiple generations cleanup",
currentGen: 2,
targetGen: 3,
availableGenerations: []uint32{0, 1, 2, 3},
expectedCleanup: []uint32{0, 1, 2}, // Don't cleanup target generation 3
},
{
name: "no cleanup needed",
currentGen: 0,
targetGen: 1,
availableGenerations: []uint32{1},
expectedCleanup: []uint32{}, // Only target generation exists
},
{
name: "cleanup all except target",
currentGen: 5,
targetGen: 6,
availableGenerations: []uint32{0, 1, 2, 3, 4, 5, 6},
expectedCleanup: []uint32{0, 1, 2, 3, 4, 5}, // Don't cleanup target generation 6
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := logic.CalculateCleanupGenerations(tt.currentGen, tt.targetGen, tt.availableGenerations)
if len(result) != len(tt.expectedCleanup) {
t.Errorf("cleanup generations length: expected %d, got %d", len(tt.expectedCleanup), len(result))
return
}
// Convert to map for easier comparison
expectedMap := make(map[uint32]bool)
for _, gen := range tt.expectedCleanup {
expectedMap[gen] = true
}
for _, gen := range result {
if !expectedMap[gen] {
t.Errorf("unexpected generation in cleanup: %d", gen)
}
delete(expectedMap, gen)
}
// Check for missing generations
for gen := range expectedMap {
t.Errorf("missing generation in cleanup: %d", gen)
}
})
}
}
func TestEstimateCleanupImpact(t *testing.T) {
logic := NewEcVacuumLogic()
plan := &VacuumPlan{
VolumeID: 123,
CurrentGeneration: 2,
TargetGeneration: 3,
SourceDistribution: ShardDistribution{
Generation: 2,
Nodes: map[pb.ServerAddress]erasure_coding.ShardBits{
"node1:8080": createShardBits([]int{0, 1, 2, 3, 4}),
"node2:8080": createShardBits([]int{5, 6, 7, 8, 9}),
"node3:8080": createShardBits([]int{10, 11, 12, 13}),
},
},
GenerationsToCleanup: []uint32{0, 1, 2}, // 3 generations to cleanup
}
volumeSize := uint64(1000000) // 1MB
impact := logic.EstimateCleanupImpact(plan, volumeSize)
if impact.GenerationsToCleanup != 3 {
t.Errorf("generations to cleanup: expected 3, got %d", impact.GenerationsToCleanup)
}
if impact.EstimatedSizeFreed != 3000000 { // 3 generations * 1MB each
t.Errorf("estimated size freed: expected 3000000, got %d", impact.EstimatedSizeFreed)
}
if impact.NodesAffected != 3 {
t.Errorf("nodes affected: expected 3, got %d", impact.NodesAffected)
}
expectedShardsToDelete := (5 + 5 + 4) * 3 // Total shards per generation * generations
if impact.ShardsToDelete != expectedShardsToDelete {
t.Errorf("shards to delete: expected %d, got %d", expectedShardsToDelete, impact.ShardsToDelete)
}
}
// Helper function to create ShardBits from shard ID slice
func createShardBits(shardIds []int) erasure_coding.ShardBits {
var bits erasure_coding.ShardBits
for _, id := range shardIds {
bits = bits.AddShardId(erasure_coding.ShardId(id))
}
return bits
}
// Test helper to create realistic topology scenarios
func createRealisticTopologyTest(t *testing.T) {
logic := NewEcVacuumLogic()
// Scenario: 3-node cluster with distributed EC shards
params := &worker_pb.TaskParams{
VolumeId: 100,
Sources: []*worker_pb.TaskSource{
{
Node: "volume1:8080",
Generation: 1,
ShardIds: []uint32{0, 1, 2, 3, 4},
},
{
Node: "volume2:8080",
Generation: 1,
ShardIds: []uint32{5, 6, 7, 8, 9},
},
{
Node: "volume3:8080",
Generation: 1,
ShardIds: []uint32{10, 11, 12, 13},
},
},
}
plan, err := logic.CreateVacuumPlan(100, "data", params)
if err != nil {
t.Fatalf("failed to create plan: %v", err)
}
// Validate the plan makes sense
if plan.CurrentGeneration != 1 || plan.TargetGeneration != 2 {
t.Errorf("generation transition: expected 1->2, got %d->%d",
plan.CurrentGeneration, plan.TargetGeneration)
}
// Validate shard distribution
err = logic.ValidateShardDistribution(plan.SourceDistribution)
if err != nil {
t.Errorf("invalid source distribution: %v", err)
}
// All source nodes should become destination nodes
if len(plan.SourceDistribution.Nodes) != len(plan.ExpectedDistribution.Nodes) {
t.Errorf("source/destination node count mismatch: %d vs %d",
len(plan.SourceDistribution.Nodes), len(plan.ExpectedDistribution.Nodes))
}
t.Logf("Plan created successfully:")
t.Logf(" Volume: %d, Collection: %s", plan.VolumeID, plan.Collection)
t.Logf(" Generation: %d -> %d", plan.CurrentGeneration, plan.TargetGeneration)
t.Logf(" Nodes: %d", len(plan.SourceDistribution.Nodes))
t.Logf(" Cleanup: %v", plan.GenerationsToCleanup)
t.Logf(" Safety checks: %d", len(plan.SafetyChecks))
}
func TestRealisticTopologyScenarios(t *testing.T) {
t.Run("3-node distributed shards", createRealisticTopologyTest)
}

582
weed/worker/tasks/ec_vacuum/ec_vacuum_scenarios_test.go

@ -0,0 +1,582 @@
package ec_vacuum
import (
"fmt"
"sort"
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
)
// TestTopologyBasedTaskGeneration tests generating EC vacuum tasks from different active topologies
func TestTopologyBasedTaskGeneration(t *testing.T) {
scenarios := []struct {
name string
topology TopologyScenario
expectTasks int
validate func(*testing.T, []*GeneratedTask)
}{
{
name: "single_volume_distributed_shards",
topology: TopologyScenario{
Volumes: []VolumeTopology{
{
VolumeID: 100,
Collection: "data",
Generation: 0,
ShardDistribution: map[string][]int{
"node1:8080": {0, 1, 2, 3, 4},
"node2:8080": {5, 6, 7, 8, 9},
"node3:8080": {10, 11, 12, 13},
},
Size: 1000000,
DeletionRatio: 0.4,
},
},
},
expectTasks: 1,
validate: func(t *testing.T, tasks []*GeneratedTask) {
task := tasks[0]
if task.VolumeID != 100 {
t.Errorf("volume ID: expected 100, got %d", task.VolumeID)
}
if len(task.SourceNodes) != 3 {
t.Errorf("source nodes: expected 3, got %d", len(task.SourceNodes))
}
// Verify all shards are accounted for
totalShards := 0
for _, shards := range task.SourceNodes {
totalShards += len(shards)
}
if totalShards != 14 {
t.Errorf("total shards: expected 14, got %d", totalShards)
}
},
},
{
name: "multiple_volumes_different_generations",
topology: TopologyScenario{
Volumes: []VolumeTopology{
{
VolumeID: 200,
Generation: 0,
ShardDistribution: map[string][]int{
"node1:8080": {0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
"node2:8080": {10, 11, 12, 13},
},
DeletionRatio: 0.6,
},
{
VolumeID: 201,
Generation: 2,
ShardDistribution: map[string][]int{
"node1:8080": {0, 1, 2, 3, 4},
"node2:8080": {5, 6, 7, 8, 9},
"node3:8080": {10, 11, 12, 13},
},
DeletionRatio: 0.5,
},
},
},
expectTasks: 2,
validate: func(t *testing.T, tasks []*GeneratedTask) {
// Sort tasks by volume ID for predictable testing
sort.Slice(tasks, func(i, j int) bool {
return tasks[i].VolumeID < tasks[j].VolumeID
})
// Validate volume 200 (generation 0 -> 1)
task0 := tasks[0]
if task0.SourceGeneration != 0 || task0.TargetGeneration != 1 {
t.Errorf("volume 200 generations: expected 0->1, got %d->%d",
task0.SourceGeneration, task0.TargetGeneration)
}
// Validate volume 201 (generation 2 -> 3)
task1 := tasks[1]
if task1.SourceGeneration != 2 || task1.TargetGeneration != 3 {
t.Errorf("volume 201 generations: expected 2->3, got %d->%d",
task1.SourceGeneration, task1.TargetGeneration)
}
},
},
{
name: "unbalanced_shard_distribution",
topology: TopologyScenario{
Volumes: []VolumeTopology{
{
VolumeID: 300,
Generation: 1,
ShardDistribution: map[string][]int{
"node1:8080": {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // 11 shards
"node2:8080": {11, 12, 13}, // 3 shards
},
DeletionRatio: 0.3,
},
},
},
expectTasks: 1,
validate: func(t *testing.T, tasks []*GeneratedTask) {
task := tasks[0]
// Verify unbalanced distribution is handled correctly
node1Shards := len(task.SourceNodes["node1:8080"])
node2Shards := len(task.SourceNodes["node2:8080"])
if node1Shards != 11 {
t.Errorf("node1 shards: expected 11, got %d", node1Shards)
}
if node2Shards != 3 {
t.Errorf("node2 shards: expected 3, got %d", node2Shards)
}
// Total should still be 14
if node1Shards+node2Shards != 14 {
t.Errorf("total shards: expected 14, got %d", node1Shards+node2Shards)
}
},
},
{
name: "insufficient_shards_for_reconstruction",
topology: TopologyScenario{
Volumes: []VolumeTopology{
{
VolumeID: 400,
Generation: 0,
ShardDistribution: map[string][]int{
"node1:8080": {0, 1, 2}, // Only 6 shards total < 10 required
"node2:8080": {3, 4, 5},
},
DeletionRatio: 0.8,
},
},
},
expectTasks: 0, // Should not generate task due to insufficient shards
},
}
generator := NewTopologyTaskGenerator()
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
tasks, err := generator.GenerateEcVacuumTasks(scenario.topology)
if err != nil {
t.Fatalf("failed to generate tasks: %v", err)
}
if len(tasks) != scenario.expectTasks {
t.Errorf("task count: expected %d, got %d", scenario.expectTasks, len(tasks))
return
}
if scenario.validate != nil {
scenario.validate(t, tasks)
}
})
}
}
// TestShardSelectionAndDeletion tests what shards are actually selected and deleted
func TestShardSelectionAndDeletion(t *testing.T) {
scenarios := []struct {
name string
initialState MultiGenerationState
expectedPlan ExpectedDeletionPlan
}{
{
name: "single_generation_cleanup",
initialState: MultiGenerationState{
VolumeID: 500,
Collection: "test",
Generations: map[uint32]GenerationData{
0: {
ShardDistribution: map[string][]int{
"node1:8080": {0, 1, 2, 3, 4, 5},
"node2:8080": {6, 7, 8, 9, 10, 11, 12, 13},
},
FilesOnDisk: []string{
"test_500.ec00", "test_500.ec01", "test_500.ec02", "test_500.ec03", "test_500.ec04", "test_500.ec05",
"test_500.ec06", "test_500.ec07", "test_500.ec08", "test_500.ec09", "test_500.ec10", "test_500.ec11", "test_500.ec12", "test_500.ec13",
"test_500.ecx", "test_500.ecj", "test_500.vif",
},
},
},
ActiveGeneration: 0,
},
expectedPlan: ExpectedDeletionPlan{
SourceGeneration: 0,
TargetGeneration: 1,
GenerationsToDelete: []uint32{0},
ShardsToDeleteByNode: map[string][]int{
"node1:8080": {0, 1, 2, 3, 4, 5},
"node2:8080": {6, 7, 8, 9, 10, 11, 12, 13},
},
FilesToDeleteByNode: map[string][]string{
"node1:8080": {
"test_500.ec00", "test_500.ec01", "test_500.ec02", "test_500.ec03", "test_500.ec04", "test_500.ec05",
"test_500.ecx", "test_500.ecj", "test_500.vif",
},
"node2:8080": {
"test_500.ec06", "test_500.ec07", "test_500.ec08", "test_500.ec09", "test_500.ec10", "test_500.ec11", "test_500.ec12", "test_500.ec13",
},
},
ExpectedFilesAfterCleanup: []string{
// New generation 1 files
"test_500_g1.ec00", "test_500_g1.ec01", "test_500_g1.ec02", "test_500_g1.ec03", "test_500_g1.ec04", "test_500_g1.ec05",
"test_500_g1.ec06", "test_500_g1.ec07", "test_500_g1.ec08", "test_500_g1.ec09", "test_500_g1.ec10", "test_500_g1.ec11", "test_500_g1.ec12", "test_500_g1.ec13",
"test_500_g1.ecx", "test_500_g1.ecj", "test_500_g1.vif",
},
},
},
{
name: "multi_generation_cleanup",
initialState: MultiGenerationState{
VolumeID: 600,
Collection: "data",
Generations: map[uint32]GenerationData{
0: {
ShardDistribution: map[string][]int{
"node1:8080": {0, 1, 2, 3, 4},
"node2:8080": {5, 6, 7, 8, 9},
"node3:8080": {10, 11, 12, 13},
},
FilesOnDisk: []string{
"data_600.ec00", "data_600.ec01", "data_600.ec02", "data_600.ec03", "data_600.ec04",
"data_600.ec05", "data_600.ec06", "data_600.ec07", "data_600.ec08", "data_600.ec09",
"data_600.ec10", "data_600.ec11", "data_600.ec12", "data_600.ec13",
"data_600.ecx", "data_600.ecj", "data_600.vif",
},
},
1: {
ShardDistribution: map[string][]int{
"node1:8080": {0, 1, 2, 3, 4},
"node2:8080": {5, 6, 7, 8, 9},
"node3:8080": {10, 11, 12, 13},
},
FilesOnDisk: []string{
"data_600_g1.ec00", "data_600_g1.ec01", "data_600_g1.ec02", "data_600_g1.ec03", "data_600_g1.ec04",
"data_600_g1.ec05", "data_600_g1.ec06", "data_600_g1.ec07", "data_600_g1.ec08", "data_600_g1.ec09",
"data_600_g1.ec10", "data_600_g1.ec11", "data_600_g1.ec12", "data_600_g1.ec13",
"data_600_g1.ecx", "data_600_g1.ecj", "data_600_g1.vif",
},
},
2: {
ShardDistribution: map[string][]int{
"node1:8080": {0, 1, 2, 3, 4},
"node2:8080": {5, 6, 7, 8, 9},
"node3:8080": {10, 11, 12, 13},
},
FilesOnDisk: []string{
"data_600_g2.ec00", "data_600_g2.ec01", "data_600_g2.ec02", "data_600_g2.ec03", "data_600_g2.ec04",
"data_600_g2.ec05", "data_600_g2.ec06", "data_600_g2.ec07", "data_600_g2.ec08", "data_600_g2.ec09",
"data_600_g2.ec10", "data_600_g2.ec11", "data_600_g2.ec12", "data_600_g2.ec13",
"data_600_g2.ecx", "data_600_g2.ecj", "data_600_g2.vif",
},
},
},
ActiveGeneration: 2,
},
expectedPlan: ExpectedDeletionPlan{
SourceGeneration: 2,
TargetGeneration: 3,
GenerationsToDelete: []uint32{2}, // Only current generation (0 and 1 should have been cleaned up in previous runs)
ShardsToDeleteByNode: map[string][]int{
"node1:8080": {0, 1, 2, 3, 4},
"node2:8080": {5, 6, 7, 8, 9},
"node3:8080": {10, 11, 12, 13},
},
FilesToDeleteByNode: map[string][]string{
"node1:8080": {
"data_600_g2.ec00", "data_600_g2.ec01", "data_600_g2.ec02", "data_600_g2.ec03", "data_600_g2.ec04",
"data_600_g2.ecx", "data_600_g2.ecj", "data_600_g2.vif",
},
"node2:8080": {
"data_600_g2.ec05", "data_600_g2.ec06", "data_600_g2.ec07", "data_600_g2.ec08", "data_600_g2.ec09",
},
"node3:8080": {
"data_600_g2.ec10", "data_600_g2.ec11", "data_600_g2.ec12", "data_600_g2.ec13",
},
},
ExpectedFilesAfterCleanup: []string{
// Old generations should remain (they should have been cleaned up before)
"data_600.ec00", "data_600.ec01", "data_600.ec02", "data_600.ec03", "data_600.ec04",
"data_600.ec05", "data_600.ec06", "data_600.ec07", "data_600.ec08", "data_600.ec09",
"data_600.ec10", "data_600.ec11", "data_600.ec12", "data_600.ec13",
"data_600.ecx", "data_600.ecj", "data_600.vif",
"data_600_g1.ec00", "data_600_g1.ec01", "data_600_g1.ec02", "data_600_g1.ec03", "data_600_g1.ec04",
"data_600_g1.ec05", "data_600_g1.ec06", "data_600_g1.ec07", "data_600_g1.ec08", "data_600_g1.ec09",
"data_600_g1.ec10", "data_600_g1.ec11", "data_600_g1.ec12", "data_600_g1.ec13",
"data_600_g1.ecx", "data_600_g1.ecj", "data_600_g1.vif",
// New generation 3 files
"data_600_g3.ec00", "data_600_g3.ec01", "data_600_g3.ec02", "data_600_g3.ec03", "data_600_g3.ec04",
"data_600_g3.ec05", "data_600_g3.ec06", "data_600_g3.ec07", "data_600_g3.ec08", "data_600_g3.ec09",
"data_600_g3.ec10", "data_600_g3.ec11", "data_600_g3.ec12", "data_600_g3.ec13",
"data_600_g3.ecx", "data_600_g3.ecj", "data_600_g3.vif",
},
},
},
}
logic := NewEcVacuumLogic()
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
// Convert multi-generation state to task parameters
params := convertMultiGenerationStateToParams(scenario.initialState)
// Create vacuum plan
plan, err := logic.CreateVacuumPlan(scenario.initialState.VolumeID, scenario.initialState.Collection, params)
if err != nil {
t.Fatalf("failed to create plan: %v", err)
}
// Validate generation transitions
if plan.CurrentGeneration != scenario.expectedPlan.SourceGeneration {
t.Errorf("source generation: expected %d, got %d",
scenario.expectedPlan.SourceGeneration, plan.CurrentGeneration)
}
if plan.TargetGeneration != scenario.expectedPlan.TargetGeneration {
t.Errorf("target generation: expected %d, got %d",
scenario.expectedPlan.TargetGeneration, plan.TargetGeneration)
}
// Validate cleanup generations
if !equalUint32Slices(plan.GenerationsToCleanup, scenario.expectedPlan.GenerationsToDelete) {
t.Errorf("cleanup generations: expected %v, got %v",
scenario.expectedPlan.GenerationsToDelete, plan.GenerationsToCleanup)
}
// Validate shard distribution
for nodeAddr, expectedShards := range scenario.expectedPlan.ShardsToDeleteByNode {
shardBits, exists := plan.SourceDistribution.Nodes[pb.ServerAddress(nodeAddr)]
if !exists {
t.Errorf("expected node %s not found in plan", nodeAddr)
continue
}
actualShards := shardBitsToSlice(shardBits)
if !equalIntSlices(actualShards, expectedShards) {
t.Errorf("node %s shards: expected %v, got %v", nodeAddr, expectedShards, actualShards)
}
}
t.Logf("Plan validation successful:")
t.Logf(" Volume: %d (%s)", plan.VolumeID, plan.Collection)
t.Logf(" Generation transition: %d -> %d", plan.CurrentGeneration, plan.TargetGeneration)
t.Logf(" Cleanup generations: %v", plan.GenerationsToCleanup)
t.Logf(" Nodes affected: %d", len(plan.SourceDistribution.Nodes))
// Estimate cleanup impact
impact := logic.EstimateCleanupImpact(plan, 1000000) // 1MB volume
t.Logf(" Estimated impact: %d shards deleted, %d bytes freed",
impact.ShardsToDelete, impact.EstimatedSizeFreed)
})
}
}
// Test data structures for comprehensive testing
type VolumeTopology struct {
VolumeID uint32
Collection string
Generation uint32
ShardDistribution map[string][]int // node -> shard IDs
Size uint64
DeletionRatio float64
}
type TopologyScenario struct {
Volumes []VolumeTopology
}
type GenerationData struct {
ShardDistribution map[string][]int // node -> shard IDs
FilesOnDisk []string
}
type MultiGenerationState struct {
VolumeID uint32
Collection string
Generations map[uint32]GenerationData
ActiveGeneration uint32
}
type ExpectedDeletionPlan struct {
SourceGeneration uint32
TargetGeneration uint32
GenerationsToDelete []uint32
ShardsToDeleteByNode map[string][]int
FilesToDeleteByNode map[string][]string
ExpectedFilesAfterCleanup []string
}
type GeneratedTask struct {
VolumeID uint32
Collection string
SourceGeneration uint32
TargetGeneration uint32
SourceNodes map[string][]int // node -> shard IDs
}
type TopologyTaskGenerator struct {
logic *EcVacuumLogic
}
func NewTopologyTaskGenerator() *TopologyTaskGenerator {
return &TopologyTaskGenerator{
logic: NewEcVacuumLogic(),
}
}
func (g *TopologyTaskGenerator) GenerateEcVacuumTasks(scenario TopologyScenario) ([]*GeneratedTask, error) {
var tasks []*GeneratedTask
for _, volume := range scenario.Volumes {
// Check if volume qualifies for vacuum (sufficient shards + deletion ratio)
if !g.qualifiesForVacuum(volume) {
continue
}
// Convert to task parameters
params := g.volumeTopologyToParams(volume)
// Create plan using logic
plan, err := g.logic.CreateVacuumPlan(volume.VolumeID, volume.Collection, params)
if err != nil {
return nil, fmt.Errorf("failed to create plan for volume %d: %w", volume.VolumeID, err)
}
// Convert plan to generated task
task := &GeneratedTask{
VolumeID: plan.VolumeID,
Collection: plan.Collection,
SourceGeneration: plan.CurrentGeneration,
TargetGeneration: plan.TargetGeneration,
SourceNodes: make(map[string][]int),
}
// Convert shard distribution
for node, shardBits := range plan.SourceDistribution.Nodes {
task.SourceNodes[string(node)] = shardBitsToSlice(shardBits)
}
tasks = append(tasks, task)
}
return tasks, nil
}
func (g *TopologyTaskGenerator) qualifiesForVacuum(volume VolumeTopology) bool {
// Check deletion ratio threshold (minimum 0.3)
if volume.DeletionRatio < 0.3 {
return false
}
// Check sufficient shards for reconstruction
totalShards := 0
for _, shards := range volume.ShardDistribution {
totalShards += len(shards)
}
return totalShards >= erasure_coding.DataShardsCount
}
func (g *TopologyTaskGenerator) volumeTopologyToParams(volume VolumeTopology) *worker_pb.TaskParams {
var sources []*worker_pb.TaskSource
for node, shardIds := range volume.ShardDistribution {
shardIds32 := make([]uint32, len(shardIds))
for i, id := range shardIds {
shardIds32[i] = uint32(id)
}
sources = append(sources, &worker_pb.TaskSource{
Node: node,
VolumeId: volume.VolumeID,
ShardIds: shardIds32,
Generation: volume.Generation,
})
}
return &worker_pb.TaskParams{
VolumeId: volume.VolumeID,
Sources: sources,
}
}
// Helper functions
func convertMultiGenerationStateToParams(state MultiGenerationState) *worker_pb.TaskParams {
// Use active generation as source
activeData := state.Generations[state.ActiveGeneration]
var sources []*worker_pb.TaskSource
for node, shardIds := range activeData.ShardDistribution {
shardIds32 := make([]uint32, len(shardIds))
for i, id := range shardIds {
shardIds32[i] = uint32(id)
}
sources = append(sources, &worker_pb.TaskSource{
Node: node,
VolumeId: state.VolumeID,
ShardIds: shardIds32,
Generation: state.ActiveGeneration,
})
}
return &worker_pb.TaskParams{
VolumeId: state.VolumeID,
Sources: sources,
}
}
func shardBitsToSlice(bits erasure_coding.ShardBits) []int {
var shards []int
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
if bits.HasShardId(erasure_coding.ShardId(i)) {
shards = append(shards, i)
}
}
return shards
}
func equalUint32Slices(a, b []uint32) bool {
if len(a) != len(b) {
return false
}
sortedA := make([]uint32, len(a))
sortedB := make([]uint32, len(b))
copy(sortedA, a)
copy(sortedB, b)
sort.Slice(sortedA, func(i, j int) bool { return sortedA[i] < sortedA[j] })
sort.Slice(sortedB, func(i, j int) bool { return sortedB[i] < sortedB[j] })
for i := range sortedA {
if sortedA[i] != sortedB[i] {
return false
}
}
return true
}
func equalIntSlices(a, b []int) bool {
if len(a) != len(b) {
return false
}
sortedA := make([]int, len(a))
sortedB := make([]int, len(b))
copy(sortedA, a)
copy(sortedB, b)
sort.Ints(sortedA)
sort.Ints(sortedB)
for i := range sortedA {
if sortedA[i] != sortedB[i] {
return false
}
}
return true
}

2
weed/worker/tasks/ec_vacuum/safety_checks_test.go

@ -293,7 +293,7 @@ func TestSafetyCheckNoActiveOperations(t *testing.T) {
task := createSafetyTestTask() task := createSafetyTestTask()
// Verify grace period is reasonable // Verify grace period is reasonable
assert.Equal(t, 5*time.Minute, task.cleanupGracePeriod, "Grace period should be 5 minutes")
assert.Equal(t, 1*time.Minute, task.cleanupGracePeriod, "Grace period should be 1 minute")
// Test that grace period logic passes // Test that grace period logic passes
// In a real scenario, this would check for active operations // In a real scenario, this would check for active operations

Loading…
Cancel
Save