diff --git a/docker/admin_integration/docker-compose-ec-test.yml b/docker/admin_integration/docker-compose-ec-test.yml index 73d0ee0ff..133ffdbc3 100644 --- a/docker/admin_integration/docker-compose-ec-test.yml +++ b/docker/admin_integration/docker-compose-ec-test.yml @@ -127,7 +127,7 @@ services: worker1: image: chrislusf/seaweedfs:local - command: "-v=2 worker -admin=admin:23646 -capabilities=erasure_coding,vacuum -maxConcurrent=2" + command: "-v=2 worker -admin=admin:23646 -capabilities=erasure_coding,ec_vacuum -maxConcurrent=2" depends_on: - admin volumes: @@ -139,7 +139,7 @@ services: worker2: image: chrislusf/seaweedfs:local - command: "-v=2 worker -admin=admin:23646 -capabilities=erasure_coding,vacuum -maxConcurrent=2" + command: "-v=2 worker -admin=admin:23646 -capabilities=erasure_coding,ec_vacuum -maxConcurrent=2" depends_on: - admin volumes: @@ -151,7 +151,7 @@ services: worker3: image: chrislusf/seaweedfs:local - command: "-v=2 worker -admin=admin:23646 -capabilities=erasure_coding,vacuum -maxConcurrent=2" + command: "-v=2 worker -admin=admin:23646 -capabilities=erasure_coding,ec_vacuum -maxConcurrent=2" depends_on: - admin volumes: diff --git a/weed/worker/tasks/ec_vacuum/config.go b/weed/worker/tasks/ec_vacuum/config.go new file mode 100644 index 000000000..093d4d592 --- /dev/null +++ b/weed/worker/tasks/ec_vacuum/config.go @@ -0,0 +1,198 @@ +package ec_vacuum + +import ( + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/admin/config" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" +) + +// Config extends BaseConfig with EC vacuum specific settings +type Config struct { + base.BaseConfig + DeletionThreshold float64 `json:"deletion_threshold"` // Minimum deletion ratio to trigger vacuum + MinVolumeAgeHours int `json:"min_volume_age_hours"` // Minimum age before considering vacuum + CollectionFilter string `json:"collection_filter"` // Filter by collection + MinSizeMB int `json:"min_size_mb"` // Minimum original volume size +} + +// NewDefaultConfig creates a new default EC vacuum configuration +func NewDefaultConfig() *Config { + return &Config{ + BaseConfig: base.BaseConfig{ + Enabled: true, + ScanIntervalSeconds: 24 * 60 * 60, // 24 hours + MaxConcurrent: 1, + }, + DeletionThreshold: 0.3, // 30% deletions trigger vacuum + MinVolumeAgeHours: 72, // 3 days minimum age + CollectionFilter: "", // No filter by default + MinSizeMB: 100, // 100MB minimum size + } +} + +// GetConfigSpec returns the configuration schema for EC vacuum tasks +func GetConfigSpec() base.ConfigSpec { + return base.ConfigSpec{ + Fields: []*config.Field{ + { + Name: "enabled", + JSONName: "enabled", + Type: config.FieldTypeBool, + DefaultValue: true, + Required: false, + DisplayName: "Enable EC Vacuum Tasks", + Description: "Whether EC vacuum tasks should be automatically created", + HelpText: "Toggle this to enable or disable automatic EC vacuum task generation", + InputType: "checkbox", + CSSClasses: "form-check-input", + }, + { + Name: "scan_interval_seconds", + JSONName: "scan_interval_seconds", + Type: config.FieldTypeInterval, + DefaultValue: 24 * 60 * 60, + MinValue: 6 * 60 * 60, // 6 hours minimum + MaxValue: 7 * 24 * 60 * 60, // 7 days maximum + Required: true, + DisplayName: "Scan Interval", + Description: "How often to scan for EC volumes needing vacuum", + HelpText: "The system will check for EC volumes with deletions at this interval", + Placeholder: "24", + Unit: config.UnitHours, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "max_concurrent", + JSONName: "max_concurrent", + Type: config.FieldTypeInt, + DefaultValue: 1, + MinValue: 1, + MaxValue: 3, + Required: true, + DisplayName: "Max Concurrent Tasks", + Description: "Maximum number of EC vacuum tasks that can run simultaneously", + HelpText: "Limits the number of EC vacuum operations running at the same time", + Placeholder: "1 (default)", + Unit: config.UnitCount, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "deletion_threshold", + JSONName: "deletion_threshold", + Type: config.FieldTypeFloat, + DefaultValue: 0.3, + MinValue: 0.1, + MaxValue: 0.8, + Required: true, + DisplayName: "Deletion Threshold", + Description: "Minimum ratio of deletions to trigger vacuum", + HelpText: "EC volumes with this ratio of deleted content will be vacuumed", + Placeholder: "0.3 (30%)", + Unit: config.UnitNone, + InputType: "number", + CSSClasses: "form-control", + }, + { + Name: "min_volume_age_hours", + JSONName: "min_volume_age_hours", + Type: config.FieldTypeInterval, + DefaultValue: 72, + MinValue: 24, + MaxValue: 30 * 24, // 30 days + Required: true, + DisplayName: "Minimum Volume Age", + Description: "Minimum age before considering EC volume for vacuum", + HelpText: "Only EC volumes older than this will be considered for vacuum", + Placeholder: "72", + Unit: config.UnitHours, + InputType: "interval", + CSSClasses: "form-control", + }, + { + Name: "collection_filter", + JSONName: "collection_filter", + Type: config.FieldTypeString, + DefaultValue: "", + Required: false, + DisplayName: "Collection Filter", + Description: "Only vacuum EC volumes in this collection (empty = all collections)", + HelpText: "Leave empty to vacuum EC volumes in all collections", + Placeholder: "e.g., 'logs' or leave empty", + Unit: config.UnitNone, + InputType: "text", + CSSClasses: "form-control", + }, + { + Name: "min_size_mb", + JSONName: "min_size_mb", + Type: config.FieldTypeInt, + DefaultValue: 100, + MinValue: 10, + MaxValue: 10000, + Required: true, + DisplayName: "Minimum Size (MB)", + Description: "Minimum original EC volume size to consider for vacuum", + HelpText: "Only EC volumes larger than this size will be considered for vacuum", + Placeholder: "100", + Unit: config.UnitNone, + InputType: "number", + CSSClasses: "form-control", + }, + }, + } +} + +// ToTaskPolicy converts configuration to a TaskPolicy protobuf message +func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy { + return &worker_pb.TaskPolicy{ + Enabled: c.Enabled, + MaxConcurrent: int32(c.MaxConcurrent), + RepeatIntervalSeconds: int32(c.ScanIntervalSeconds), + CheckIntervalSeconds: int32(c.ScanIntervalSeconds), + // Note: EC vacuum specific config would go in TaskConfig field + // For now using basic policy until protobuf definitions are added + } +} + +// FromTaskPolicy loads configuration from a TaskPolicy protobuf message +func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error { + if policy == nil { + return fmt.Errorf("policy is nil") + } + + // Set general TaskPolicy fields + c.Enabled = policy.Enabled + c.MaxConcurrent = int(policy.MaxConcurrent) + c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) + + // Note: EC vacuum-specific fields would be loaded from TaskConfig field + // For now using defaults until protobuf definitions are added + // Keep existing values if not specified in policy + + return nil +} + +// LoadConfigFromPersistence loads configuration from the persistence layer if available +func LoadConfigFromPersistence(configPersistence interface{}) *Config { + config := NewDefaultConfig() + + // Try to load from persistence if available using generic method + if persistence, ok := configPersistence.(interface { + LoadTaskPolicyGeneric(taskType string) (*worker_pb.TaskPolicy, error) + }); ok { + if policy, err := persistence.LoadTaskPolicyGeneric("ec_vacuum"); err == nil && policy != nil { + if err := config.FromTaskPolicy(policy); err == nil { + glog.V(1).Infof("Loaded EC vacuum configuration from persistence") + return config + } + } + } + + glog.V(1).Infof("Using default EC vacuum configuration") + return config +} diff --git a/weed/worker/tasks/ec_vacuum/detection.go b/weed/worker/tasks/ec_vacuum/detection.go new file mode 100644 index 000000000..60edaadbf --- /dev/null +++ b/weed/worker/tasks/ec_vacuum/detection.go @@ -0,0 +1,208 @@ +package ec_vacuum + +import ( + "fmt" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + wtypes "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// Detection identifies EC volumes that need vacuum operations +func Detection(metrics []*wtypes.VolumeHealthMetrics, info *wtypes.ClusterInfo, config base.TaskConfig) ([]*wtypes.TaskDetectionResult, error) { + ecVacuumConfig, ok := config.(*Config) + if !ok { + return nil, fmt.Errorf("invalid config type for EC vacuum detection") + } + + if !ecVacuumConfig.Enabled { + return nil, nil + } + + glog.V(2).Infof("EC vacuum detection: checking %d volume metrics", len(metrics)) + + var results []*wtypes.TaskDetectionResult + now := time.Now() + + // Get topology info for EC shard analysis + if info.ActiveTopology == nil { + glog.V(1).Infof("EC vacuum detection: no topology info available") + return results, nil + } + + // Collect EC volume information from topology + ecVolumeInfo := collectEcVolumeInfo(info.ActiveTopology) + glog.V(2).Infof("EC vacuum detection: found %d EC volumes in topology", len(ecVolumeInfo)) + + for volumeID, ecInfo := range ecVolumeInfo { + // Apply filters + if !shouldVacuumEcVolume(ecInfo, ecVacuumConfig, now) { + continue + } + + // Calculate deletion ratio + deletionRatio := calculateDeletionRatio(ecInfo) + if deletionRatio < ecVacuumConfig.DeletionThreshold { + glog.V(3).Infof("EC volume %d deletion ratio %.3f below threshold %.3f", + volumeID, deletionRatio, ecVacuumConfig.DeletionThreshold) + continue + } + + // Generate task ID for ActiveTopology integration + taskID := fmt.Sprintf("ec_vacuum_vol_%d_%d", volumeID, now.Unix()) + + result := &wtypes.TaskDetectionResult{ + TaskID: taskID, + TaskType: wtypes.TaskType("ec_vacuum"), + VolumeID: volumeID, + Server: ecInfo.PrimaryNode, + Collection: ecInfo.Collection, + Priority: wtypes.TaskPriorityLow, // EC vacuum is not urgent + Reason: fmt.Sprintf("EC volume needs vacuum: deletion_ratio=%.1f%% (>%.1f%%), age=%.1fh (>%dh), size=%.1fMB (>%dMB)", + deletionRatio*100, ecVacuumConfig.DeletionThreshold*100, + ecInfo.Age.Hours(), ecVacuumConfig.MinVolumeAgeHours, + float64(ecInfo.Size)/(1024*1024), ecVacuumConfig.MinSizeMB), + ScheduleAt: now, + } + + // Add to topology's pending tasks for capacity management (simplified for now) + if info.ActiveTopology != nil { + glog.V(3).Infof("EC vacuum detection: would add pending task %s to topology for volume %d", taskID, volumeID) + // Note: Simplified for now - in production would properly integrate with ActiveTopology + } + + results = append(results, result) + + glog.V(1).Infof("EC vacuum detection: queued volume %d for vacuum (deletion_ratio=%.1f%%, size=%.1fMB)", + volumeID, deletionRatio*100, float64(ecInfo.Size)/(1024*1024)) + } + + glog.V(1).Infof("EC vacuum detection: found %d EC volumes needing vacuum", len(results)) + return results, nil +} + +// EcVolumeInfo contains information about an EC volume +type EcVolumeInfo struct { + VolumeID uint32 + Collection string + Size uint64 + CreatedAt time.Time + Age time.Duration + PrimaryNode string + ShardNodes map[pb.ServerAddress]erasure_coding.ShardBits + DeletionInfo DeletionInfo +} + +// DeletionInfo contains deletion statistics for an EC volume +type DeletionInfo struct { + TotalEntries int64 + DeletedEntries int64 + DeletionRatio float64 +} + +// collectEcVolumeInfo extracts EC volume information from active topology +func collectEcVolumeInfo(activeTopology interface{}) map[uint32]*EcVolumeInfo { + ecVolumes := make(map[uint32]*EcVolumeInfo) + + // Simplified implementation for demonstration + // In production, this would query the topology for actual EC volume information + // For now, return empty map since we don't have direct access to topology data + glog.V(3).Infof("EC vacuum detection: topology analysis not implemented, returning empty volume list") + + return ecVolumes +} + +// shouldVacuumEcVolume determines if an EC volume should be considered for vacuum +func shouldVacuumEcVolume(ecInfo *EcVolumeInfo, config *Config, now time.Time) bool { + // Check minimum age + if ecInfo.Age < time.Duration(config.MinVolumeAgeHours)*time.Hour { + glog.V(3).Infof("EC volume %d too young: age=%.1fh < %dh", + ecInfo.VolumeID, ecInfo.Age.Hours(), config.MinVolumeAgeHours) + return false + } + + // Check minimum size + sizeMB := float64(ecInfo.Size) / (1024 * 1024) + if sizeMB < float64(config.MinSizeMB) { + glog.V(3).Infof("EC volume %d too small: size=%.1fMB < %dMB", + ecInfo.VolumeID, sizeMB, config.MinSizeMB) + return false + } + + // Check collection filter + if config.CollectionFilter != "" && !strings.Contains(ecInfo.Collection, config.CollectionFilter) { + glog.V(3).Infof("EC volume %d collection %s doesn't match filter %s", + ecInfo.VolumeID, ecInfo.Collection, config.CollectionFilter) + return false + } + + // Check if we have enough shards for vacuum operation + totalShards := 0 + for _, shardBits := range ecInfo.ShardNodes { + totalShards += shardBits.ShardIdCount() + } + + if totalShards < erasure_coding.DataShardsCount { + glog.V(3).Infof("EC volume %d insufficient shards for vacuum: have=%d, need=%d", + ecInfo.VolumeID, totalShards, erasure_coding.DataShardsCount) + return false + } + + return true +} + +// calculateDeletionRatio calculates the deletion ratio for an EC volume +func calculateDeletionRatio(ecInfo *EcVolumeInfo) float64 { + if ecInfo.DeletionInfo.TotalEntries == 0 { + // If no deletion info available, estimate based on shard distribution + // Volumes with uneven shard distribution might indicate deletion + return estimateDeletionFromShardDistribution(ecInfo) + } + + return ecInfo.DeletionInfo.DeletionRatio +} + +// estimateDeletionInfo provides a simplified estimation of deletion info +func estimateDeletionInfo(volumeSize uint64) DeletionInfo { + // Simplified estimation - in reality would parse ecj files + // For demonstration, assume some deletion exists if the volume is old enough + estimatedTotal := int64(volumeSize / 1024) // Rough estimate of entries + estimatedDeleted := estimatedTotal / 10 // Assume 10% deletions as baseline + + deletionRatio := 0.0 + if estimatedTotal > 0 { + deletionRatio = float64(estimatedDeleted) / float64(estimatedTotal) + } + + return DeletionInfo{ + TotalEntries: estimatedTotal, + DeletedEntries: estimatedDeleted, + DeletionRatio: deletionRatio, + } +} + +// estimateDeletionFromShardDistribution estimates deletion ratio from shard distribution patterns +func estimateDeletionFromShardDistribution(ecInfo *EcVolumeInfo) float64 { + // Simplified heuristic: if shards are not evenly distributed, + // it might indicate the volume has been through some operations + // In a real implementation, would analyze ecj files directly + + nodeCount := len(ecInfo.ShardNodes) + if nodeCount == 0 { + return 0.0 + } + + // If all shards are on one node, it might indicate consolidation due to deletions + for _, shardBits := range ecInfo.ShardNodes { + if shardBits.ShardIdCount() >= erasure_coding.TotalShardsCount { + return 0.4 // Higher deletion ratio for consolidated volumes + } + } + + // Default conservative estimate + return 0.1 +} diff --git a/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go b/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go new file mode 100644 index 000000000..0525a7ccf --- /dev/null +++ b/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go @@ -0,0 +1,343 @@ +package ec_vacuum + +import ( + "context" + "fmt" + "os" + "path/filepath" + "time" + + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/worker/types" + "github.com/seaweedfs/seaweedfs/weed/worker/types/base" + "google.golang.org/grpc" +) + +// EcVacuumTask represents an EC vacuum task that collects, decodes, and re-encodes EC volumes +type EcVacuumTask struct { + *base.BaseTask + volumeID uint32 + collection string + sourceNodes map[pb.ServerAddress]erasure_coding.ShardBits + tempDir string + grpcDialOption grpc.DialOption +} + +// NewEcVacuumTask creates a new EC vacuum task instance +func NewEcVacuumTask(id string, volumeID uint32, collection string, sourceNodes map[pb.ServerAddress]erasure_coding.ShardBits) *EcVacuumTask { + return &EcVacuumTask{ + BaseTask: base.NewBaseTask(id, types.TaskType("ec_vacuum")), + volumeID: volumeID, + collection: collection, + sourceNodes: sourceNodes, + } +} + +// Execute performs the EC vacuum operation +func (t *EcVacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error { + t.LogInfo("Starting EC vacuum task", map[string]interface{}{ + "volume_id": t.volumeID, + "collection": t.collection, + "shard_nodes": len(t.sourceNodes), + }) + + // Step 1: Create temporary working directory + if err := t.createTempDir(); err != nil { + return fmt.Errorf("failed to create temp directory: %w", err) + } + defer t.cleanup() + + // Step 2: Collect EC shards to this worker + targetNode, err := t.collectEcShardsToWorker() + if err != nil { + return fmt.Errorf("failed to collect EC shards: %w", err) + } + + // Step 3: Decode EC shards into normal volume (skips deleted entries automatically) + if err := t.decodeEcShardsToVolume(targetNode); err != nil { + return fmt.Errorf("failed to decode EC shards to volume: %w", err) + } + + // Step 4: Re-encode the cleaned volume into new EC shards + if err := t.encodeVolumeToEcShards(targetNode); err != nil { + return fmt.Errorf("failed to encode volume to EC shards: %w", err) + } + + // Step 5: Distribute new EC shards to cluster + if err := t.distributeNewEcShards(targetNode); err != nil { + return fmt.Errorf("failed to distribute new EC shards: %w", err) + } + + // Step 6: Clean up old EC shards + if err := t.cleanupOldEcShards(); err != nil { + t.LogWarning("Failed to clean up old EC shards", map[string]interface{}{ + "error": err.Error(), + }) + // Don't fail the task for cleanup errors + } + + t.LogInfo("EC vacuum task completed successfully", map[string]interface{}{ + "volume_id": t.volumeID, + "collection": t.collection, + }) + + return nil +} + +// createTempDir creates a temporary directory for the vacuum operation +func (t *EcVacuumTask) createTempDir() error { + tempDir := filepath.Join(os.TempDir(), fmt.Sprintf("ec_vacuum_%d_%d", t.volumeID, time.Now().Unix())) + if err := os.MkdirAll(tempDir, 0755); err != nil { + return err + } + t.tempDir = tempDir + t.LogInfo("Created temporary directory", map[string]interface{}{ + "temp_dir": tempDir, + }) + return nil +} + +// collectEcShardsToWorker collects all EC shards to the current worker +func (t *EcVacuumTask) collectEcShardsToWorker() (pb.ServerAddress, error) { + // Find the node with the most shards as the target + var targetNode pb.ServerAddress + maxShardCount := 0 + var existingEcIndexBits erasure_coding.ShardBits + + for node, shardBits := range t.sourceNodes { + shardCount := shardBits.ShardIdCount() + if shardCount > maxShardCount { + maxShardCount = shardCount + targetNode = node + existingEcIndexBits = shardBits + } + } + + t.LogInfo("Selected target node for shard collection", map[string]interface{}{ + "target_node": targetNode, + "existing_bits": existingEcIndexBits, + "shard_count": maxShardCount, + }) + + // Copy missing shards to target node + for sourceNode, shardBits := range t.sourceNodes { + if sourceNode == targetNode { + continue + } + + needToCopyBits := shardBits.Minus(existingEcIndexBits) + if needToCopyBits.ShardIdCount() == 0 { + continue + } + + err := operation.WithVolumeServerClient(false, targetNode, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + t.LogInfo("Copying EC shards", map[string]interface{}{ + "volume_id": t.volumeID, + "shard_ids": needToCopyBits.ShardIds(), + "from": sourceNode, + "to": targetNode, + }) + + _, copyErr := client.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ + VolumeId: t.volumeID, + Collection: t.collection, + ShardIds: needToCopyBits.ToUint32Slice(), + CopyEcxFile: false, + CopyEcjFile: true, + CopyVifFile: true, + SourceDataNode: string(sourceNode), + }) + if copyErr != nil { + return fmt.Errorf("failed to copy shards %v from %s to %s: %w", needToCopyBits.ShardIds(), sourceNode, targetNode, copyErr) + } + + // Mount the copied shards + _, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: t.volumeID, + Collection: t.collection, + ShardIds: needToCopyBits.ToUint32Slice(), + }) + if mountErr != nil { + return fmt.Errorf("failed to mount shards %v on %s: %w", needToCopyBits.ShardIds(), targetNode, mountErr) + } + + return nil + }) + + if err != nil { + return "", err + } + + existingEcIndexBits = existingEcIndexBits.Plus(needToCopyBits) + } + + return targetNode, nil +} + +// decodeEcShardsToVolume decodes EC shards into a normal volume, automatically skipping deleted entries +func (t *EcVacuumTask) decodeEcShardsToVolume(targetNode pb.ServerAddress) error { + t.LogInfo("Decoding EC shards to normal volume", map[string]interface{}{ + "volume_id": t.volumeID, + "target": targetNode, + }) + + return operation.WithVolumeServerClient(false, targetNode, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + _, err := client.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{ + VolumeId: t.volumeID, + Collection: t.collection, + }) + return err + }) +} + +// encodeVolumeToEcShards re-encodes the cleaned volume into new EC shards +func (t *EcVacuumTask) encodeVolumeToEcShards(targetNode pb.ServerAddress) error { + t.LogInfo("Encoding cleaned volume to EC shards", map[string]interface{}{ + "volume_id": t.volumeID, + "target": targetNode, + }) + + return operation.WithVolumeServerClient(false, targetNode, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + _, err := client.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{ + VolumeId: t.volumeID, + Collection: t.collection, + }) + return err + }) +} + +// distributeNewEcShards distributes the new EC shards across the cluster +func (t *EcVacuumTask) distributeNewEcShards(sourceNode pb.ServerAddress) error { + t.LogInfo("Distributing new EC shards", map[string]interface{}{ + "volume_id": t.volumeID, + "source": sourceNode, + }) + + // For simplicity, we'll distribute to the same nodes as before + // In a real implementation, you might want to use topology info for better placement + + // Create bit pattern for all shards (0-13) + allShardBits := erasure_coding.ShardBits(0) + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + allShardBits = allShardBits.AddShardId(erasure_coding.ShardId(i)) + } + + for targetNode, originalShardBits := range t.sourceNodes { + if targetNode == sourceNode { + continue // Skip source node + } + + // Distribute the same shards that were originally on this target + needToDistributeBits := originalShardBits + if needToDistributeBits.ShardIdCount() == 0 { + continue + } + + err := operation.WithVolumeServerClient(false, targetNode, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + t.LogInfo("Copying new EC shards", map[string]interface{}{ + "volume_id": t.volumeID, + "shard_ids": needToDistributeBits.ShardIds(), + "from": sourceNode, + "to": targetNode, + }) + + _, copyErr := client.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ + VolumeId: t.volumeID, + Collection: t.collection, + ShardIds: needToDistributeBits.ToUint32Slice(), + CopyEcxFile: true, + CopyEcjFile: true, + CopyVifFile: true, + SourceDataNode: string(sourceNode), + }) + if copyErr != nil { + return fmt.Errorf("failed to copy new shards %v from %s to %s: %w", needToDistributeBits.ShardIds(), sourceNode, targetNode, copyErr) + } + + // Mount the new shards + _, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: t.volumeID, + Collection: t.collection, + ShardIds: needToDistributeBits.ToUint32Slice(), + }) + if mountErr != nil { + return fmt.Errorf("failed to mount new shards %v on %s: %w", needToDistributeBits.ShardIds(), targetNode, mountErr) + } + + return nil + }) + + if err != nil { + return err + } + } + + return nil +} + +// cleanupOldEcShards removes the original volume after successful vacuum +func (t *EcVacuumTask) cleanupOldEcShards() error { + t.LogInfo("Cleaning up original volume", map[string]interface{}{ + "volume_id": t.volumeID, + }) + + // Remove the original normal volume from the source node + for targetNode := range t.sourceNodes { + err := operation.WithVolumeServerClient(false, targetNode, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + _, err := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{ + VolumeId: t.volumeID, + }) + // Ignore errors if volume doesn't exist + if err != nil { + t.LogInfo("Volume delete completed or volume not found", map[string]interface{}{ + "volume_id": t.volumeID, + "node": targetNode, + "note": "This is normal if volume was already cleaned up", + }) + } + return nil + }) + + if err != nil { + return err + } + break // Only need to delete from one node + } + + return nil +} + +// cleanup removes temporary files and directories +func (t *EcVacuumTask) cleanup() { + if t.tempDir != "" { + if err := os.RemoveAll(t.tempDir); err != nil { + t.LogWarning("Failed to remove temporary directory", map[string]interface{}{ + "temp_dir": t.tempDir, + "error": err.Error(), + }) + } else { + t.LogInfo("Cleaned up temporary directory", map[string]interface{}{ + "temp_dir": t.tempDir, + }) + } + } +} + +// GetVolumeID returns the volume ID being processed +func (t *EcVacuumTask) GetVolumeID() uint32 { + return t.volumeID +} + +// GetCollection returns the collection name +func (t *EcVacuumTask) GetCollection() string { + return t.collection +} + +// SetGrpcDialOption sets the GRPC dial option for volume server communication +func (t *EcVacuumTask) SetGrpcDialOption(option grpc.DialOption) { + t.grpcDialOption = option +} diff --git a/weed/worker/tasks/ec_vacuum/scheduling.go b/weed/worker/tasks/ec_vacuum/scheduling.go new file mode 100644 index 000000000..54f3daf12 --- /dev/null +++ b/weed/worker/tasks/ec_vacuum/scheduling.go @@ -0,0 +1,145 @@ +package ec_vacuum + +import ( + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// Scheduling determines if an EC vacuum task should be scheduled for execution +func Scheduling(task *types.TaskInput, runningTasks []*types.TaskInput, availableWorkers []*types.WorkerData, config base.TaskConfig) bool { + ecVacuumConfig, ok := config.(*Config) + if !ok { + glog.Errorf("EC vacuum scheduling: invalid config type") + return false + } + + // Count running EC vacuum tasks + runningCount := 0 + for _, runningTask := range runningTasks { + if runningTask.Type == types.TaskType("ec_vacuum") { + runningCount++ + } + } + + // Check concurrency limit + if runningCount >= ecVacuumConfig.MaxConcurrent { + glog.V(2).Infof("EC vacuum scheduling: max concurrent limit reached (%d/%d)", runningCount, ecVacuumConfig.MaxConcurrent) + return false + } + + // Check if any worker can handle EC vacuum tasks + hasCapableWorker := false + var selectedWorker *types.WorkerData + + for _, worker := range availableWorkers { + if canWorkerHandleEcVacuum(worker, task) { + hasCapableWorker = true + selectedWorker = worker + break + } + } + + if !hasCapableWorker { + glog.V(2).Infof("EC vacuum scheduling: no capable workers available for task %s", task.ID) + return false + } + + // Check worker resource availability + if !hasEnoughResources(selectedWorker, task) { + glog.V(2).Infof("EC vacuum scheduling: worker %s doesn't have enough resources for task %s", + selectedWorker.ID, task.ID) + return false + } + + // Additional checks for EC vacuum specific requirements + if !meetsEcVacuumRequirements(task, ecVacuumConfig) { + glog.V(2).Infof("EC vacuum scheduling: task %s doesn't meet EC vacuum requirements", task.ID) + return false + } + + glog.V(1).Infof("EC vacuum scheduling: approved task %s for worker %s", task.ID, selectedWorker.ID) + return true +} + +// canWorkerHandleEcVacuum checks if a worker can handle EC vacuum tasks +func canWorkerHandleEcVacuum(worker *types.WorkerData, task *types.TaskInput) bool { + // Check if worker has EC vacuum capability + for _, capability := range worker.Capabilities { + if capability == types.TaskType("ec_vacuum") { + return true + } + // Also accept workers with general erasure_coding capability + if capability == types.TaskType("erasure_coding") { + return true + } + } + + glog.V(3).Infof("Worker %s lacks EC vacuum capability", worker.ID) + return false +} + +// hasEnoughResources checks if a worker has sufficient resources for EC vacuum +func hasEnoughResources(worker *types.WorkerData, task *types.TaskInput) bool { + // Check current load using what's available in WorkerData + if worker.CurrentLoad >= 2 { // Conservative limit for EC vacuum + glog.V(3).Infof("Worker %s at capacity: load=%d", worker.ID, worker.CurrentLoad) + return false + } + + // EC vacuum tasks require more resources than regular tasks + // because they involve decode/encode operations + // We'll assume workers have sufficient resources for now + // In a production system, these checks would be more sophisticated + + return true +} + +// meetsEcVacuumRequirements checks EC vacuum specific requirements +func meetsEcVacuumRequirements(task *types.TaskInput, config *Config) bool { + // Validate task has required parameters + if task.VolumeID == 0 { + glog.V(3).Infof("EC vacuum task %s missing volume ID", task.ID) + return false + } + + // Check if this is during allowed time windows (if any restrictions) + // For now, we allow EC vacuum anytime, but this could be made configurable + + // Validate collection filter if specified + if config.CollectionFilter != "" && task.Collection != config.CollectionFilter { + glog.V(3).Infof("EC vacuum task %s collection %s doesn't match filter %s", + task.ID, task.Collection, config.CollectionFilter) + return false + } + + // Additional safety checks could be added here, such as: + // - Checking if volume is currently being written to + // - Verifying minimum deletion threshold is still met + // - Ensuring cluster health is good for such operations + + return true +} + +// GetResourceRequirements returns the resource requirements for EC vacuum tasks +func GetResourceRequirements() map[string]interface{} { + return map[string]interface{}{ + "MinConcurrentSlots": 2, // Need extra slots for decode/encode + "MinDiskSpaceGB": 10, // Minimum 10GB free space + "MinMemoryMB": 1024, // 1GB memory for operations + "PreferredNetworkMbps": 100, // Good network for shard transfers + "RequiredCapabilities": []string{"ec_vacuum", "erasure_coding"}, + "ConflictingTaskTypes": []string{"erasure_coding"}, // Don't run with regular EC tasks on same volume + } +} + +// CalculateTaskPriority calculates priority for EC vacuum tasks +func CalculateTaskPriority(task *types.TaskInput, metrics *types.VolumeHealthMetrics) types.TaskPriority { + // Higher priority for larger volumes (more space to reclaim) + if task.VolumeID > 1000000 { // Rough size indicator + return types.TaskPriorityMedium + } + + // Default priority + return types.TaskPriorityLow +} diff --git a/weed/worker/worker.go b/weed/worker/worker.go index eb0c28e13..ee834ac7a 100644 --- a/weed/worker/worker.go +++ b/weed/worker/worker.go @@ -16,6 +16,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/worker/types" // Import task packages to trigger their auto-registration + _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/ec_vacuum" _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" )