diff --git a/weed/admin/handlers/maintenance_handlers.go b/weed/admin/handlers/maintenance_handlers.go index 8b5c25577..a587d1b96 100644 --- a/weed/admin/handlers/maintenance_handlers.go +++ b/weed/admin/handlers/maintenance_handlers.go @@ -449,41 +449,3 @@ func (h *MaintenanceHandlers) updateMaintenanceConfig(config *maintenance.Mainte // Delegate to AdminServer's real persistence method return h.adminServer.UpdateMaintenanceConfigData(config) } - -// floatPtr is a helper function to create float64 pointers -func floatPtr(f float64) *float64 { - return &f -} - -// Global templ UI registry - temporarily disabled -// var globalTemplUIRegistry *types.UITemplRegistry - -// initTemplUIRegistry initializes the global templ UI registry - temporarily disabled -func initTemplUIRegistry() { - // Temporarily disabled due to missing types - // if globalTemplUIRegistry == nil { - // globalTemplUIRegistry = types.NewUITemplRegistry() - // // Register vacuum templ UI provider using shared instances - // vacuumDetector, vacuumScheduler := vacuum.GetSharedInstances() - // vacuum.RegisterUITempl(globalTemplUIRegistry, vacuumDetector, vacuumScheduler) - // // Register erasure coding templ UI provider using shared instances - // erasureCodingDetector, erasureCodingScheduler := erasure_coding.GetSharedInstances() - // erasure_coding.RegisterUITempl(globalTemplUIRegistry, erasureCodingDetector, erasureCodingScheduler) - // // Register balance templ UI provider using shared instances - // balanceDetector, balanceScheduler := balance.GetSharedInstances() - // balance.RegisterUITempl(globalTemplUIRegistry, balanceDetector, balanceScheduler) - // } -} - -// getTemplUIProvider gets the templ UI provider for a task type - temporarily disabled -func getTemplUIProvider(taskType maintenance.MaintenanceTaskType) interface{} { - // initTemplUIRegistry() - // Convert maintenance task type to worker task type - // typesRegistry := tasks.GetGlobalTypesRegistry() - // for workerTaskType := range typesRegistry.GetAllDetectors() { - // if string(workerTaskType) == string(taskType) { - // return globalTemplUIRegistry.GetProvider(workerTaskType) - // } - // } - return nil -} diff --git a/weed/worker/tasks/balance/ui.go b/weed/worker/tasks/balance/ui.go index c8da586ce..6e34b68a2 100644 --- a/weed/worker/tasks/balance/ui.go +++ b/weed/worker/tasks/balance/ui.go @@ -1,6 +1,7 @@ package balance import ( + "fmt" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -62,7 +63,7 @@ func (logic *BalanceUILogic) GetCurrentConfig() interface{} { func (logic *BalanceUILogic) ApplyConfig(config interface{}) error { balanceConfig, ok := config.(*BalanceConfig) if !ok { - return nil // Will be handled by base provider fallback + return fmt.Errorf("invalid configuration type for balance") } // Apply to detector diff --git a/weed/worker/tasks/erasure_coding/ec.go b/weed/worker/tasks/erasure_coding/ec.go index 8d678199d..61c13188f 100644 --- a/weed/worker/tasks/erasure_coding/ec.go +++ b/weed/worker/tasks/erasure_coding/ec.go @@ -7,7 +7,6 @@ import ( "math" "os" "path/filepath" - "sort" "sync" "time" @@ -46,25 +45,6 @@ type Task struct { stepProgress map[string]float64 } -// ServerInfo holds information about available servers for shard placement -type ServerInfo struct { - Address string - DataCenter string - Rack string - AvailableSpace int64 - LoadScore float64 - ShardCount int -} - -// ShardPlacement represents where a shard should be placed -type ShardPlacement struct { - ShardID int - ServerAddr string - DataCenter string - Rack string - BackupAddrs []string // Alternative servers for redundancy -} - // NewTask creates a new erasure coding task func NewTask(sourceServer string, volumeID uint32) *Task { task := &Task{ @@ -320,33 +300,6 @@ func (t *Task) copyVolumeFile(client volume_server_pb.VolumeServerClient, ctx co return nil } -// markVolumeReadOnly marks the source volume as read-only -func (t *Task) markVolumeReadOnly() error { - t.currentStep = "marking_readonly" - t.SetProgress(20.0) - glog.V(1).Infof("Marking volume %d as read-only", t.volumeID) - - ctx := context.Background() - // Convert to gRPC address - grpcAddress := pb.ServerToGrpcAddress(t.sourceServer) - conn, err := grpc.NewClient(grpcAddress, t.grpcDialOpt) - if err != nil { - return fmt.Errorf("failed to connect to source server: %v", err) - } - defer conn.Close() - - client := volume_server_pb.NewVolumeServerClient(conn) - _, err = client.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{ - VolumeId: t.volumeID, - }) - if err != nil { - return fmt.Errorf("failed to mark volume read-only: %v", err) - } - - t.SetProgress(25.0) - return nil -} - // performLocalECEncoding performs Reed-Solomon encoding on local volume files func (t *Task) performLocalECEncoding(workDir string) ([]string, error) { t.currentStep = "encoding" @@ -455,324 +408,6 @@ func (t *Task) performLocalECEncoding(workDir string) ([]string, error) { return shardFiles, nil } -// calculateOptimalShardPlacement determines where to place each shard for optimal distribution -func (t *Task) calculateOptimalShardPlacement() ([]ShardPlacement, error) { - t.currentStep = "calculating_placement" - t.SetProgress(65.0) - glog.V(1).Infof("Calculating optimal shard placement for volume %d", t.volumeID) - - // Get available servers from master - servers, err := t.getAvailableServers() - if err != nil { - return nil, fmt.Errorf("failed to get available servers: %v", err) - } - - if len(servers) < t.totalShards { - return nil, fmt.Errorf("insufficient servers: need %d, have %d", t.totalShards, len(servers)) - } - - // Sort servers by placement desirability (considering space, load, affinity) - t.rankServersForPlacement(servers) - - // Assign shards to servers with affinity logic - placements := make([]ShardPlacement, t.totalShards) - usedServers := make(map[string]int) // Track how many shards per server - - for shardID := 0; shardID < t.totalShards; shardID++ { - server := t.selectBestServerForShard(servers, usedServers, shardID) - if server == nil { - return nil, fmt.Errorf("failed to find suitable server for shard %d", shardID) - } - - placements[shardID] = ShardPlacement{ - ShardID: shardID, - ServerAddr: server.Address, - DataCenter: server.DataCenter, - Rack: server.Rack, - BackupAddrs: t.selectBackupServers(servers, server, 2), - } - - usedServers[server.Address]++ - glog.V(2).Infof("Assigned shard %d to server %s (DC: %s, Rack: %s)", - shardID, server.Address, server.DataCenter, server.Rack) - } - - t.SetProgress(70.0) - glog.V(1).Infof("Calculated placement for %d shards across %d servers", - t.totalShards, len(usedServers)) - return placements, nil -} - -// getAvailableServers retrieves available servers from the master -func (t *Task) getAvailableServers() ([]*ServerInfo, error) { - ctx := context.Background() - conn, err := grpc.NewClient(t.masterClient, t.grpcDialOpt) - if err != nil { - return nil, fmt.Errorf("failed to connect to master: %v", err) - } - defer conn.Close() - - client := master_pb.NewSeaweedClient(conn) - resp, err := client.VolumeList(ctx, &master_pb.VolumeListRequest{}) - if err != nil { - return nil, fmt.Errorf("failed to get volume list: %v", err) - } - - servers := make([]*ServerInfo, 0) - - // Parse topology information to extract server details - if resp.TopologyInfo != nil { - for _, dc := range resp.TopologyInfo.DataCenterInfos { - for _, rack := range dc.RackInfos { - for _, node := range rack.DataNodeInfos { - for diskType, diskInfo := range node.DiskInfos { - server := &ServerInfo{ - Address: fmt.Sprintf("%s:%d", node.Id, node.GrpcPort), - DataCenter: dc.Id, - Rack: rack.Id, - AvailableSpace: int64(diskInfo.FreeVolumeCount) * 32 * 1024 * 1024 * 1024, // Rough estimate - LoadScore: float64(diskInfo.ActiveVolumeCount) / float64(diskInfo.MaxVolumeCount), - ShardCount: 0, - } - - // Skip servers that are full or have high load - if diskInfo.FreeVolumeCount > 0 && server.LoadScore < 0.9 { - servers = append(servers, server) - glog.V(2).Infof("Available server: %s (DC: %s, Rack: %s, DiskType: %s, Load: %.2f)", - server.Address, server.DataCenter, server.Rack, diskType, server.LoadScore) - } - } - } - } - } - } - - return servers, nil -} - -// rankServersForPlacement sorts servers by desirability for shard placement -func (t *Task) rankServersForPlacement(servers []*ServerInfo) { - sort.Slice(servers, func(i, j int) bool { - serverA, serverB := servers[i], servers[j] - - // Primary criteria: lower load is better - if serverA.LoadScore != serverB.LoadScore { - return serverA.LoadScore < serverB.LoadScore - } - - // Secondary criteria: more available space is better - if serverA.AvailableSpace != serverB.AvailableSpace { - return serverA.AvailableSpace > serverB.AvailableSpace - } - - // Tertiary criteria: fewer existing shards is better - return serverA.ShardCount < serverB.ShardCount - }) -} - -// selectBestServerForShard selects the best server for a specific shard considering affinity -func (t *Task) selectBestServerForShard(servers []*ServerInfo, usedServers map[string]int, shardID int) *ServerInfo { - // For data shards (0-9), prefer distribution across different racks - // For parity shards (10-13), can be more flexible - isDataShard := shardID < t.dataShards - - var candidates []*ServerInfo - - if isDataShard { - // For data shards, prioritize rack diversity - usedRacks := make(map[string]bool) - for _, server := range servers { - if count, exists := usedServers[server.Address]; exists && count > 0 { - usedRacks[server.Rack] = true - } - } - - // First try to find servers in unused racks - for _, server := range servers { - if !usedRacks[server.Rack] && usedServers[server.Address] < 2 { // Max 2 shards per server - candidates = append(candidates, server) - } - } - - // If no unused racks, fall back to any available server - if len(candidates) == 0 { - for _, server := range servers { - if usedServers[server.Address] < 2 { - candidates = append(candidates, server) - } - } - } - } else { - // For parity shards, just avoid overloading servers - for _, server := range servers { - if usedServers[server.Address] < 2 { - candidates = append(candidates, server) - } - } - } - - if len(candidates) == 0 { - // Last resort: allow up to 3 shards per server - for _, server := range servers { - if usedServers[server.Address] < 3 { - candidates = append(candidates, server) - } - } - } - - if len(candidates) > 0 { - return candidates[0] // Already sorted by desirability - } - - return nil -} - -// selectBackupServers selects backup servers for redundancy -func (t *Task) selectBackupServers(servers []*ServerInfo, primaryServer *ServerInfo, count int) []string { - var backups []string - - for _, server := range servers { - if server.Address != primaryServer.Address && server.Rack != primaryServer.Rack { - backups = append(backups, server.Address) - if len(backups) >= count { - break - } - } - } - - return backups -} - -// distributeShards uploads shards to their assigned servers -func (t *Task) distributeShards(shardFiles []string, placements []ShardPlacement) error { - t.currentStep = "distributing_shards" - t.SetProgress(75.0) - glog.V(1).Infof("Distributing %d shards to target servers", len(placements)) - - // Distribute shards in parallel for better performance - successCount := 0 - errors := make([]error, 0) - - for i, placement := range placements { - shardFile := shardFiles[i] - - err := t.uploadShardToServer(shardFile, placement) - if err != nil { - glog.Errorf("Failed to upload shard %d to %s: %v", i, placement.ServerAddr, err) - errors = append(errors, err) - - // Try backup servers - uploaded := false - for _, backupAddr := range placement.BackupAddrs { - backupPlacement := placement - backupPlacement.ServerAddr = backupAddr - if err := t.uploadShardToServer(shardFile, backupPlacement); err == nil { - glog.V(1).Infof("Successfully uploaded shard %d to backup server %s", i, backupAddr) - uploaded = true - break - } - } - - if !uploaded { - return fmt.Errorf("failed to upload shard %d to any server", i) - } - } - - successCount++ - progress := 75.0 + (float64(successCount)/float64(len(placements)))*15.0 - t.SetProgress(progress) - - glog.V(2).Infof("Successfully distributed shard %d to %s", i, placement.ServerAddr) - } - - if len(errors) > 0 && successCount < len(placements)/2 { - return fmt.Errorf("too many shard distribution failures: %d/%d", len(errors), len(placements)) - } - - t.SetProgress(90.0) - glog.V(1).Infof("Successfully distributed %d/%d shards", successCount, len(placements)) - return nil -} - -// uploadShardToServer uploads a shard file to a specific server -func (t *Task) uploadShardToServer(shardFile string, placement ShardPlacement) error { - glog.V(2).Infof("Uploading shard %d to server %s", placement.ShardID, placement.ServerAddr) - - ctx := context.Background() - // Convert to gRPC address - grpcAddress := pb.ServerToGrpcAddress(placement.ServerAddr) - conn, err := grpc.NewClient(grpcAddress, t.grpcDialOpt) - if err != nil { - return fmt.Errorf("failed to connect to server %s: %v", placement.ServerAddr, err) - } - defer conn.Close() - - client := volume_server_pb.NewVolumeServerClient(conn) - - // Upload shard using VolumeEcShardsCopy - this assumes shards are already generated locally - // and we're copying them to the target server - shardIds := []uint32{uint32(placement.ShardID)} - _, err = client.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ - VolumeId: t.volumeID, - Collection: t.collection, - ShardIds: shardIds, - CopyEcxFile: true, - CopyEcjFile: true, - CopyVifFile: true, - }) - if err != nil { - return fmt.Errorf("failed to copy EC shard: %v", err) - } - - glog.V(2).Infof("Successfully uploaded shard %d to %s", placement.ShardID, placement.ServerAddr) - return nil -} - -// verifyAndCleanupSource verifies the EC conversion and cleans up the source volume -func (t *Task) verifyAndCleanupSource() error { - t.currentStep = "verify_cleanup" - t.SetProgress(95.0) - glog.V(1).Infof("Verifying EC conversion and cleaning up source volume %d", t.volumeID) - - ctx := context.Background() - // Convert to gRPC address - grpcAddress := pb.ServerToGrpcAddress(t.sourceServer) - conn, err := grpc.NewClient(grpcAddress, t.grpcDialOpt) - if err != nil { - return fmt.Errorf("failed to connect to source server: %v", err) - } - defer conn.Close() - - client := volume_server_pb.NewVolumeServerClient(conn) - - // Verify source volume is read-only - statusResp, err := client.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{ - VolumeId: t.volumeID, - }) - if err == nil && statusResp.IsReadOnly { - glog.V(1).Infof("Source volume %d is confirmed read-only", t.volumeID) - } - - // Delete source volume files (optional - could be kept for backup) - // This would normally be done after confirming all shards are properly distributed - // _, err = client.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{ - // VolumeId: t.volumeID, - // }) - // if err != nil { - // glog.Warningf("Failed to delete source volume: %v", err) - // } - - return nil -} - -// cleanup removes temporary files and directories -func (t *Task) cleanup(workDir string) { - glog.V(1).Infof("Cleaning up work directory: %s", workDir) - if err := os.RemoveAll(workDir); err != nil { - glog.Warningf("Failed to cleanup work directory %s: %v", workDir, err) - } -} - // Validate validates the task parameters func (t *Task) Validate(params types.TaskParams) error { if params.VolumeID == 0 { @@ -1350,12 +985,6 @@ func (t *Task) uploadShardToTargetServer(shardFile string, targetServer pb.Serve return nil } -// uploadShardDataDirectly is no longer needed - kept for compatibility -func (t *Task) uploadShardDataDirectly(file *os.File, targetServer pb.ServerAddress, shardId uint32, fileSize int64) error { - // This method is deprecated in favor of gRPC streaming - return fmt.Errorf("uploadShardDataDirectly is deprecated - use gRPC ReceiveFile instead") -} - // mountShardOnServer mounts an EC shard on target server func (t *Task) mountShardOnServer(targetServer pb.ServerAddress, shardId uint32) error { glog.V(1).Infof("MOUNT START: Mounting shard %d on server %s", shardId, targetServer) @@ -1387,99 +1016,3 @@ func (t *Task) mountShardOnServer(targetServer pb.ServerAddress, shardId uint32) glog.V(1).Infof("MOUNT SUCCESS: Shard %d successfully mounted on %s", shardId, targetServer) return nil } - -// uploadShardsToSourceServer uploads generated EC shards back to the source volume server -func (t *Task) uploadShardsToSourceServer(shardFiles []string) error { - glog.V(1).Infof("Uploading %d EC shards back to source server %s", len(shardFiles), t.sourceServer) - - // TODO: Implement actual upload mechanism - // This would upload the locally generated shards back to the source volume server - // so they can be distributed using the standard VolumeEcShardsCopy mechanism - - for i, shardFile := range shardFiles { - info, err := os.Stat(shardFile) - if err != nil { - return fmt.Errorf("shard file %s not found: %v", shardFile, err) - } - glog.V(2).Infof("Shard %d: %s (%d bytes) ready for upload", i, shardFile, info.Size()) - } - - // Placeholder - in production this would upload each shard file - // to the source volume server's disk location - glog.V(1).Infof("Placeholder: would upload %d shards to source server", len(shardFiles)) - return nil -} - -// distributeEcShardsFromSource distributes EC shards from source server using VolumeEcShardsCopy -func (t *Task) distributeEcShardsFromSource() error { - glog.V(1).Infof("Distributing EC shards from source server %s using VolumeEcShardsCopy", t.sourceServer) - - // Get available servers for distribution - availableServers, err := t.getAvailableServers() - if err != nil { - return fmt.Errorf("failed to get available servers: %v", err) - } - - if len(availableServers) < 4 { - return fmt.Errorf("insufficient servers for EC distribution: need at least 4, found %d", len(availableServers)) - } - - // Distribute shards using round-robin to available servers - for shardId := 0; shardId < t.totalShards; shardId++ { - targetServer := availableServers[shardId%len(availableServers)] - - // Skip if target is the same as source - if targetServer.Address == t.sourceServer { - continue - } - - err := t.copyAndMountSingleShard(targetServer.Address, uint32(shardId)) - if err != nil { - return fmt.Errorf("failed to copy and mount shard %d to %s: %v", shardId, targetServer.Address, err) - } - } - - return nil -} - -// copyAndMountSingleShard copies a single shard from source to target and mounts it -func (t *Task) copyAndMountSingleShard(targetServer string, shardId uint32) error { - glog.V(1).Infof("Copying and mounting shard %d from %s to %s", shardId, t.sourceServer, targetServer) - - ctx := context.Background() - grpcAddress := pb.ServerToGrpcAddress(targetServer) - conn, err := grpc.NewClient(grpcAddress, t.grpcDialOpt) - if err != nil { - return fmt.Errorf("failed to connect to %s: %v", targetServer, err) - } - defer conn.Close() - - client := volume_server_pb.NewVolumeServerClient(conn) - - // Copy shard using VolumeEcShardsCopy - _, err = client.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ - VolumeId: t.volumeID, - Collection: t.collection, - ShardIds: []uint32{shardId}, - CopyEcxFile: shardId == 0, // Only copy .ecx file with first shard - CopyEcjFile: true, - CopyVifFile: shardId == 0, // Only copy .vif file with first shard - SourceDataNode: t.sourceServer, - }) - if err != nil { - return fmt.Errorf("failed to copy shard %d: %v", shardId, err) - } - - // Mount shard - _, err = client.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ - VolumeId: t.volumeID, - Collection: t.collection, - ShardIds: []uint32{shardId}, - }) - if err != nil { - return fmt.Errorf("failed to mount shard %d: %v", shardId, err) - } - - glog.V(1).Infof("Successfully copied and mounted shard %d on %s", shardId, targetServer) - return nil -} diff --git a/weed/worker/tasks/erasure_coding/ec_detector.go b/weed/worker/tasks/erasure_coding/ec_detector.go index bd303ceb3..5fd13845f 100644 --- a/weed/worker/tasks/erasure_coding/ec_detector.go +++ b/weed/worker/tasks/erasure_coding/ec_detector.go @@ -25,14 +25,14 @@ var ( _ types.PolicyConfigurableDetector = (*EcDetector)(nil) ) -// NewEcDetector creates a new erasure coding detector with configurable defaults +// NewEcDetector creates a new erasure coding detector with production defaults func NewEcDetector() *EcDetector { return &EcDetector{ - enabled: true, // Enabled for testing - quietForSeconds: 0, // No quiet requirement for testing (was 24) - fullnessRatio: 0.90, // 90% full by default - minSizeMB: 50, // Minimum 50MB for testing (was 100MB) - scanInterval: 30 * time.Second, // Faster scanning for testing + enabled: false, // Conservative default - enable via configuration + quietForSeconds: 7 * 24 * 60 * 60, // 7 days quiet period + fullnessRatio: 0.90, // 90% full threshold + minSizeMB: 100, // Minimum 100MB volume size + scanInterval: 12 * time.Hour, // Scan every 12 hours collectionFilter: "", // No collection filter by default } } @@ -155,37 +155,50 @@ func (d *EcDetector) Configure(config map[string]interface{}) error { return nil } -// Legacy compatibility methods for existing code - +// SetEnabled sets whether the detector is enabled func (d *EcDetector) SetEnabled(enabled bool) { d.enabled = enabled } -func (d *EcDetector) SetVolumeAgeSeconds(seconds int) { - d.quietForSeconds = seconds -} - -func (d *EcDetector) SetVolumeAgeHours(hours int) { - d.quietForSeconds = hours * 3600 // Convert hours to seconds -} - +// SetQuietForSeconds sets the quiet duration threshold in seconds func (d *EcDetector) SetQuietForSeconds(seconds int) { d.quietForSeconds = seconds } +// SetFullnessRatio sets the fullness ratio threshold func (d *EcDetector) SetFullnessRatio(ratio float64) { d.fullnessRatio = ratio } +// SetCollectionFilter sets the collection filter func (d *EcDetector) SetCollectionFilter(filter string) { d.collectionFilter = filter } +// SetScanInterval sets the scan interval func (d *EcDetector) SetScanInterval(interval time.Duration) { d.scanInterval = interval } -// PolicyConfigurableDetector interface implementation +// GetQuietForSeconds returns the current quiet duration threshold in seconds +func (d *EcDetector) GetQuietForSeconds() int { + return d.quietForSeconds +} + +// GetFullnessRatio returns the current fullness ratio threshold +func (d *EcDetector) GetFullnessRatio() float64 { + return d.fullnessRatio +} + +// GetCollectionFilter returns the current collection filter +func (d *EcDetector) GetCollectionFilter() string { + return d.collectionFilter +} + +// GetScanInterval returns the scan interval +func (d *EcDetector) GetScanInterval() time.Duration { + return d.scanInterval +} // ConfigureFromPolicy configures the detector from maintenance policy func (d *EcDetector) ConfigureFromPolicy(policy interface{}) { @@ -211,33 +224,3 @@ func (d *EcDetector) ConfigureFromPolicy(policy interface{}) { glog.Warningf("ConfigureFromPolicy received unknown policy type: %T", policy) } } - -// GetVolumeAgeSeconds returns the current volume age threshold in seconds (legacy method) -func (d *EcDetector) GetVolumeAgeSeconds() int { - return d.quietForSeconds -} - -// GetVolumeAgeHours returns the current volume age threshold in hours (legacy method) -func (d *EcDetector) GetVolumeAgeHours() int { - return d.quietForSeconds / 3600 // Convert seconds to hours -} - -// GetQuietForSeconds returns the current quiet duration threshold in seconds -func (d *EcDetector) GetQuietForSeconds() int { - return d.quietForSeconds -} - -// GetFullnessRatio returns the current fullness ratio threshold -func (d *EcDetector) GetFullnessRatio() float64 { - return d.fullnessRatio -} - -// GetCollectionFilter returns the current collection filter -func (d *EcDetector) GetCollectionFilter() string { - return d.collectionFilter -} - -// GetScanInterval returns the scan interval -func (d *EcDetector) GetScanInterval() time.Duration { - return d.scanInterval -} diff --git a/weed/worker/tasks/erasure_coding/ec_scheduler.go b/weed/worker/tasks/erasure_coding/ec_scheduler.go index b2366bb06..d74c4c6ae 100644 --- a/weed/worker/tasks/erasure_coding/ec_scheduler.go +++ b/weed/worker/tasks/erasure_coding/ec_scheduler.go @@ -79,25 +79,6 @@ func (s *Scheduler) GetPriority(task *types.Task) types.TaskPriority { return types.TaskPriorityLow // EC is not urgent } -// WasTaskRecentlyCompleted checks if a similar task was recently completed -func (s *Scheduler) WasTaskRecentlyCompleted(task *types.Task, completedTasks []*types.Task, now time.Time) bool { - // Don't repeat EC for 24 hours - interval := 24 * time.Hour - cutoff := now.Add(-interval) - - for _, completedTask := range completedTasks { - if completedTask.Type == types.TaskTypeErasureCoding && - completedTask.VolumeID == task.VolumeID && - completedTask.Server == task.Server && - completedTask.Status == types.TaskStatusCompleted && - completedTask.CompletedAt != nil && - completedTask.CompletedAt.After(cutoff) { - return true - } - } - return false -} - // IsEnabled returns whether this task type is enabled func (s *Scheduler) IsEnabled() bool { return s.enabled diff --git a/weed/worker/tasks/erasure_coding/ui.go b/weed/worker/tasks/erasure_coding/ui.go index e06d2ae84..2ab338722 100644 --- a/weed/worker/tasks/erasure_coding/ui.go +++ b/weed/worker/tasks/erasure_coding/ui.go @@ -1,6 +1,7 @@ package erasure_coding import ( + "fmt" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -65,7 +66,7 @@ func (logic *ErasureCodingUILogic) GetCurrentConfig() interface{} { func (logic *ErasureCodingUILogic) ApplyConfig(config interface{}) error { ecConfig, ok := config.(ErasureCodingConfig) if !ok { - return nil // Will be handled by base provider fallback + return fmt.Errorf("invalid configuration type for erasure coding") } // Apply to detector diff --git a/weed/worker/tasks/vacuum/ui.go b/weed/worker/tasks/vacuum/ui.go index cfaa113a1..a6176ffa6 100644 --- a/weed/worker/tasks/vacuum/ui.go +++ b/weed/worker/tasks/vacuum/ui.go @@ -1,6 +1,7 @@ package vacuum import ( + "fmt" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -65,7 +66,7 @@ func (logic *VacuumUILogic) GetCurrentConfig() interface{} { func (logic *VacuumUILogic) ApplyConfig(config interface{}) error { vacuumConfig, ok := config.(*VacuumConfig) if !ok { - return nil // Will be handled by base provider fallback + return fmt.Errorf("invalid configuration type for vacuum") } // Apply to detector