diff --git a/.github/workflows/ec-integration.yml b/.github/workflows/ec-integration.yml new file mode 100644 index 000000000..266ce1761 --- /dev/null +++ b/.github/workflows/ec-integration.yml @@ -0,0 +1,49 @@ +name: EC Integration Tests + +on: + push: + branches: [ master ] + paths: + - 'weed/admin/**' + - 'weed/worker/**' + - 'test/erasure_coding/admin_dockertest/**' + - '.github/workflows/ec-integration.yml' + pull_request: + branches: [ master ] + paths: + - 'weed/admin/**' + - 'weed/worker/**' + - 'test/erasure_coding/admin_dockertest/**' + - '.github/workflows/ec-integration.yml' + +jobs: + ec-integration-test: + runs-on: ubuntu-latest + timeout-minutes: 15 + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: '1.24' + + - name: Build weed binary + run: | + cd weed + go build -o ../weed_bin + + - name: Run EC integration tests + run: | + cd test/erasure_coding/admin_dockertest + go test -v -timeout 15m ec_integration_test.go + + - name: Upload test logs on failure + if: failure() + uses: actions/upload-artifact@v4 + with: + name: ec-test-logs + path: test/erasure_coding/admin_dockertest/tmp/logs/ + retention-days: 7 diff --git a/.gitignore b/.gitignore index f8e614b17..a4446b7de 100644 --- a/.gitignore +++ b/.gitignore @@ -138,3 +138,6 @@ test/s3/remote_cache/primary-server.pid *.id *.pid test/s3/iam/.test_env +/test/erasure_coding/admin_dockertest/tmp +/test/erasure_coding/admin_dockertest/task_logs +weed_bin diff --git a/test/erasure_coding/admin_dockertest/Makefile b/test/erasure_coding/admin_dockertest/Makefile new file mode 100644 index 000000000..6d2ca643a --- /dev/null +++ b/test/erasure_coding/admin_dockertest/Makefile @@ -0,0 +1,7 @@ +.PHONY: test clean + +test: + go test -v . + +clean: + rm -rf tmp \ No newline at end of file diff --git a/test/erasure_coding/admin_dockertest/ec_integration_test.go b/test/erasure_coding/admin_dockertest/ec_integration_test.go new file mode 100644 index 000000000..9f8c44c82 --- /dev/null +++ b/test/erasure_coding/admin_dockertest/ec_integration_test.go @@ -0,0 +1,404 @@ +package admin_dockertest + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "math/rand" + "net/http" + "net/url" + "os" + "os/exec" + "path/filepath" + "regexp" + "strings" + "testing" + "time" +) + +const ( + AdminUrl = "http://localhost:23646" + MasterUrl = "http://localhost:9333" + FilerUrl = "http://localhost:8888" +) + +// Helper to run commands in background and track PIDs for cleanup +var runningCmds []*exec.Cmd + +func cleanup() { + for _, cmd := range runningCmds { + if cmd.Process != nil { + cmd.Process.Kill() + } + } +} + +func startWeed(t *testing.T, name string, args ...string) *exec.Cmd { + cmd := exec.Command("./weed_bin", args...) + + // Create logs dir in local ./tmp + wd, _ := os.Getwd() + logDir := filepath.Join(wd, "tmp", "logs") + os.MkdirAll(logDir, 0755) + + logFile, err := os.Create(filepath.Join(logDir, name+".log")) + if err != nil { + t.Fatalf("Failed to create log file: %v", err) + } + + cmd.Stdout = logFile + cmd.Stderr = logFile + // Set Cwd to test directory so it finds local ./tmp + cmd.Dir = wd + + // assume "weed_bin" binary is in project root. + rootDir := filepath.Dir(filepath.Dir(filepath.Dir(wd))) + cmd.Path = filepath.Join(rootDir, "weed_bin") + + err = cmd.Start() + if err != nil { + t.Fatalf("Failed to start weed %v: %v", args, err) + } + runningCmds = append(runningCmds, cmd) + return cmd +} + +func stopWeed(t *testing.T, cmd *exec.Cmd) { + if cmd != nil && cmd.Process != nil { + t.Logf("Stopping process %d", cmd.Process.Pid) + cmd.Process.Kill() + cmd.Wait() + + // Remove from runningCmds to avoid double kill in cleanup + for i, c := range runningCmds { + if c == cmd { + runningCmds = append(runningCmds[:i], runningCmds[i+1:]...) + break + } + } + } +} + +func ensureEnvironment(t *testing.T) { + // 1. Build weed binary + wd, _ := os.Getwd() + rootDir := filepath.Dir(filepath.Dir(filepath.Dir(wd))) // Up 3 levels + + buildCmd := exec.Command("go", "build", "-o", "weed_bin", "./weed") + buildCmd.Dir = rootDir + buildCmd.Stdout = os.Stdout + buildCmd.Stderr = os.Stderr + if err := buildCmd.Run(); err != nil { + t.Fatalf("Failed to build weed: %v", err) + } + t.Log("Successfully built weed binary") + + // 2. Start Master + // Use local ./tmp/master + os.RemoveAll("tmp") + err := os.MkdirAll(filepath.Join("tmp", "master"), 0755) + if err != nil { + t.Fatalf("Failed to create tmp dir: %v", err) + } + + startWeed(t, "master", "master", "-mdir=./tmp/master", "-port=9333", "-ip=localhost", "-peers=none", "-volumeSizeLimitMB=100") + + // Wait for master + waitForUrl(t, MasterUrl+"/cluster/status", 10) + + // 3. Start Volume Server (Worker) + // Start 14 volume servers to verify RS(10,4) default EC + for i := 1; i <= 14; i++ { + volName := fmt.Sprintf("volume%d", i) + port := 8080 + i - 1 + dir := filepath.Join("tmp", volName) + os.MkdirAll(dir, 0755) + startWeed(t, volName, "volume", "-dir="+dir, "-mserver=localhost:9333", fmt.Sprintf("-port=%d", port), "-ip=localhost") + } + + // 4. Start Filer + os.MkdirAll(filepath.Join("tmp", "filer"), 0755) + startWeed(t, "filer", "filer", "-defaultStoreDir=./tmp/filer", "-master=localhost:9333", "-port=8888", "-ip=localhost") + waitForUrl(t, FilerUrl+"/", 60) + + // 5. Start Workers (Maintenance) + // We need workers to execute EC tasks + for i := 1; i <= 2; i++ { + workerName := fmt.Sprintf("worker%d", i) + metricsPort := 9327 + i - 1 + debugPort := 6060 + i + dir, _ := filepath.Abs(filepath.Join("tmp", workerName)) + os.MkdirAll(dir, 0755) + startWeed(t, workerName, "worker", "-admin=localhost:23646", "-workingDir="+dir, fmt.Sprintf("-metricsPort=%d", metricsPort), fmt.Sprintf("-debug.port=%d", debugPort)) + } + + // 6. Start Admin + os.RemoveAll(filepath.Join("tmp", "admin")) + os.MkdirAll(filepath.Join("tmp", "admin"), 0755) + startWeed(t, "admin", "admin", "-master=localhost:9333", "-port=23646", "-dataDir=./tmp/admin") + waitForUrl(t, AdminUrl+"/health", 60) + + t.Log("Environment started successfully") +} + +func waitForUrl(t *testing.T, url string, retries int) { + for i := 0; i < retries; i++ { + resp, err := http.Get(url) + if err == nil && resp.StatusCode == 200 { + resp.Body.Close() + return + } + time.Sleep(1 * time.Second) + } + t.Fatalf("Timeout waiting for %s", url) +} + +func TestEcEndToEnd(t *testing.T) { + defer cleanup() + ensureEnvironment(t) + + client := &http.Client{} + + // 1. Configure Global Maintenance (Scan Interval = 1s) via API + t.Log("Configuring Global Maintenance via API...") + + // 1.1 Fetch current config + req, _ := http.NewRequest("GET", AdminUrl+"/api/maintenance/config", nil) + resp, err := client.Do(req) + if err != nil { + t.Fatalf("Failed to get global config: %v", err) + } + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + t.Fatalf("Failed to get global config (status %d): %s", resp.StatusCode, string(body)) + } + + var globalConfig map[string]interface{} + if err := json.NewDecoder(resp.Body).Decode(&globalConfig); err != nil { + t.Fatalf("Failed to decode global config: %v", err) + } + resp.Body.Close() + + // 1.2 Modify config + globalConfig["enabled"] = true + globalConfig["scan_interval_seconds"] = 1 + + // Ensure policy structure exists + if globalConfig["policy"] == nil { + globalConfig["policy"] = map[string]interface{}{} + } + policy, _ := globalConfig["policy"].(map[string]interface{}) + + // Ensure task_policies structure exists + if policy["task_policies"] == nil { + policy["task_policies"] = map[string]interface{}{} + } + taskPolicies, _ := policy["task_policies"].(map[string]interface{}) + + // Disable balance tasks to avoid interference with EC test + if taskPolicies["balance"] == nil { + taskPolicies["balance"] = map[string]interface{}{} + } + balancePolicy, _ := taskPolicies["balance"].(map[string]interface{}) + balancePolicy["enabled"] = false + + // Set global max concurrent + policy["global_max_concurrent"] = 4 + globalConfig["policy"] = policy + + // Explicitly set required fields + requiredFields := map[string]float64{ + "worker_timeout_seconds": 300, + "task_timeout_seconds": 7200, + "retry_delay_seconds": 900, + "cleanup_interval_seconds": 86400, + "task_retention_seconds": 604800, + "max_retries": 3, + } + for field, val := range requiredFields { + if _, ok := globalConfig[field]; !ok || globalConfig[field] == 0 { + globalConfig[field] = val + } + } + + // 1.3 Update config + jsonBody, _ := json.Marshal(globalConfig) + req, _ = http.NewRequest("PUT", AdminUrl+"/api/maintenance/config", bytes.NewBuffer(jsonBody)) + req.Header.Set("Content-Type", "application/json") + resp, err = client.Do(req) + if err != nil { + t.Fatalf("Failed to update global config: %v", err) + } + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + t.Fatalf("Failed to update global config (status %d): %s", resp.StatusCode, string(body)) + } + resp.Body.Close() + + // 2. Configure EC Task (Short intervals) via Form API + t.Log("Configuring EC Task via Form API...") + formData := url.Values{} + formData.Set("enabled", "true") + formData.Set("scan_interval_seconds", "1") + formData.Set("repeat_interval_seconds", "1") + formData.Set("check_interval_seconds", "1") + formData.Set("max_concurrent", "4") + formData.Set("quiet_for_seconds_value", "1") + formData.Set("quiet_for_seconds_unit", "seconds") + formData.Set("min_size_mb", "1") + formData.Set("fullness_ratio", "0.0001") + + req, _ = http.NewRequest("POST", AdminUrl+"/maintenance/config/erasure_coding", strings.NewReader(formData.Encode())) + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + resp, err = client.Do(req) + if err != nil { + t.Fatalf("Failed to update EC config: %v", err) + } + if resp.StatusCode != 200 && resp.StatusCode != 303 { + body, _ := io.ReadAll(resp.Body) + t.Fatalf("Failed to update EC config (status %d): %s", resp.StatusCode, string(body)) + } + resp.Body.Close() + t.Log("EC Task Configuration updated") + + // 3. Restart Admin to pick up Global Config (Scan Interval) + if len(runningCmds) > 0 { + adminCmd := runningCmds[len(runningCmds)-1] + t.Log("Restarting Admin Server to apply configuration...") + stopWeed(t, adminCmd) + time.Sleep(10 * time.Second) + startWeed(t, "admin_restarted", "admin", "-master=localhost:9333", "-port=23646", "-port.grpc=33646", "-dataDir=./tmp/admin") + waitForUrl(t, AdminUrl+"/health", 60) + } + + // 4. Upload a file + fileSize := 5 * 1024 * 1024 + data := make([]byte, fileSize) + rand.Read(data) + fileName := fmt.Sprintf("ec_test_file_%d", time.Now().Unix()) + t.Logf("Uploading %d bytes file %s to Filer...", fileSize, fileName) + uploadUrl := FilerUrl + "/" + fileName + + var uploadErr error + for i := 0; i < 10; i++ { + req, _ := http.NewRequest("PUT", uploadUrl, bytes.NewBuffer(data)) + resp, err := client.Do(req) + if err == nil { + if resp.StatusCode == 201 { + resp.Body.Close() + uploadErr = nil + break + } + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + uploadErr = fmt.Errorf("status %d: %s", resp.StatusCode, string(body)) + } else { + uploadErr = err + } + t.Logf("Upload attempt %d failed: %v", i+1, uploadErr) + time.Sleep(2 * time.Second) + } + + if uploadErr != nil { + t.Fatalf("Failed to upload file after retries: %v", uploadErr) + } + t.Log("Upload successful") + + // 5. Verify EC Encoding + t.Log("Waiting for EC encoding (checking Master topology)...") + startTime := time.Now() + ecVerified := false + var lastBody []byte + + for time.Since(startTime) < 300*time.Second { + // 5.1 Check Master Topology + resp, err := http.Get(MasterUrl + "/dir/status") + if err == nil { + lastBody, _ = ioutil.ReadAll(resp.Body) + resp.Body.Close() + + // Check total EC shards + reShards := regexp.MustCompile(`"EcShards":\s*(\d+)`) + matches := reShards.FindAllSubmatch(lastBody, -1) + totalShards := 0 + for _, m := range matches { + var count int + fmt.Sscanf(string(m[1]), "%d", &count) + totalShards += count + } + + if totalShards > 0 { + t.Logf("EC encoding verified (found %d total EcShards in topology) after %d seconds", totalShards, int(time.Since(startTime).Seconds())) + ecVerified = true + break + } + } + + // 5.2 Debug: Check workers and tasks + wResp, wErr := http.Get(AdminUrl + "/api/maintenance/workers") + workerCount := 0 + if wErr == nil { + var workers []interface{} + json.NewDecoder(wResp.Body).Decode(&workers) + wResp.Body.Close() + workerCount = len(workers) + } + + tResp, tErr := http.Get(AdminUrl + "/api/maintenance/tasks") + taskCount := 0 + if tErr == nil { + var tasks []interface{} + json.NewDecoder(tResp.Body).Decode(&tasks) + tResp.Body.Close() + taskCount = len(tasks) + } + t.Logf("Waiting for EC... (Workers: %d, Active Tasks: %d)", workerCount, taskCount) + + time.Sleep(10 * time.Second) + } + + if !ecVerified { + dumpLogs(t) + t.Fatalf("Timed out waiting for EC encoding verified in Topology. Last body: %s", string(lastBody)) + } + + // 6. Verification: Read back the file + t.Log("Reading back file...") + resp, err = http.Get(uploadUrl) + if err != nil { + dumpLogs(t) + t.Fatalf("Failed to read back file: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + dumpLogs(t) + t.Fatalf("Read back failed status: %d", resp.StatusCode) + } + content, _ := io.ReadAll(resp.Body) + if len(content) != fileSize { + dumpLogs(t) + t.Fatalf("Read back size mismatch: got %d, want %d", len(content), fileSize) + } + + // Verify byte-wise content equality + if !bytes.Equal(content, data) { + dumpLogs(t) + t.Fatalf("Read back content mismatch: uploaded and downloaded data differ") + } + + t.Log("Test PASS: EC encoding and read back successful!") +} + +func dumpLogs(t *testing.T) { + wd, _ := os.Getwd() + logDir := filepath.Join(wd, "tmp", "logs") + files, _ := os.ReadDir(logDir) + for _, f := range files { + if strings.HasSuffix(f.Name(), ".log") { + content, _ := os.ReadFile(filepath.Join(logDir, f.Name())) + t.Logf("--- LOG DUMP: %s ---\n%s\n--- END LOG ---", f.Name(), string(content)) + } + } +} diff --git a/test/fuse_integration/README.md b/test/fuse_integration/README.md index 6f520eaf5..8c0387806 100644 --- a/test/fuse_integration/README.md +++ b/test/fuse_integration/README.md @@ -216,7 +216,7 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-go@v3 with: - go-version: '1.21' + go-version: '1.24' - name: Install FUSE run: sudo apt-get install -y fuse diff --git a/weed/admin/config/schema.go b/weed/admin/config/schema.go index 54fb615f9..fe7984701 100644 --- a/weed/admin/config/schema.go +++ b/weed/admin/config/schema.go @@ -113,6 +113,11 @@ func SecondsToIntervalValueUnit(totalSeconds int) (int, string) { return 0, "minutes" } + // Preserve seconds when not divisible by minutes + if totalSeconds < 60 || totalSeconds%60 != 0 { + return totalSeconds, "seconds" + } + // Check if it's evenly divisible by days if totalSeconds%(24*3600) == 0 { return totalSeconds / (24 * 3600), "days" @@ -136,6 +141,8 @@ func IntervalValueUnitToSeconds(value int, unit string) int { return value * 3600 case "minutes": return value * 60 + case "seconds": + return value default: return value * 60 // Default to minutes } diff --git a/weed/admin/dash/config_persistence.go b/weed/admin/dash/config_persistence.go index 1fe1a9b42..6578ee890 100644 --- a/weed/admin/dash/config_persistence.go +++ b/weed/admin/dash/config_persistence.go @@ -620,6 +620,26 @@ func (cp *ConfigPersistence) loadTaskConfig(filename string, config proto.Messag return nil } +// SaveTaskPolicy generic dispatcher for task persistence +func (cp *ConfigPersistence) SaveTaskPolicy(taskType string, policy *worker_pb.TaskPolicy) error { + switch taskType { + case "vacuum": + return cp.SaveVacuumTaskPolicy(policy) + case "erasure_coding": + return cp.SaveErasureCodingTaskPolicy(policy) + case "balance": + return cp.SaveBalanceTaskPolicy(policy) + case "replication": + return cp.SaveReplicationTaskPolicy(policy) + } + return fmt.Errorf("unknown task type: %s", taskType) +} + +// SaveReplicationTaskPolicy saves complete replication task policy to protobuf file +func (cp *ConfigPersistence) SaveReplicationTaskPolicy(policy *worker_pb.TaskPolicy) error { + return cp.saveTaskConfig(ReplicationTaskConfigFile, policy) +} + // GetDataDir returns the data directory path func (cp *ConfigPersistence) GetDataDir() string { return cp.dataDir diff --git a/weed/admin/dash/worker_grpc_server.go b/weed/admin/dash/worker_grpc_server.go index c4b187797..34e9a9284 100644 --- a/weed/admin/dash/worker_grpc_server.go +++ b/weed/admin/dash/worker_grpc_server.go @@ -395,6 +395,23 @@ func (s *WorkerGrpcServer) handleTaskRequest(conn *WorkerConnection, request *wo glog.Warningf("Failed to send task assignment to worker %s", conn.workerID) } } else { + // Send explicit "No Task" response to prevent worker timeout + // Workers expect a TaskAssignment message but will sleep if TaskId is empty + noTaskAssignment := &worker_pb.AdminMessage{ + Timestamp: time.Now().Unix(), + Message: &worker_pb.AdminMessage_TaskAssignment{ + TaskAssignment: &worker_pb.TaskAssignment{ + TaskId: "", // Empty TaskId indicates no task available + }, + }, + } + + select { + case conn.outgoing <- noTaskAssignment: + glog.V(2).Infof("Sent 'No Task' response to worker %s", conn.workerID) + case <-time.After(time.Second): + // If we can't send, the worker will eventually time out and reconnect, which is fine + } } } diff --git a/weed/admin/maintenance/maintenance_integration.go b/weed/admin/maintenance/maintenance_integration.go index dfa413e70..a6cef590c 100644 --- a/weed/admin/maintenance/maintenance_integration.go +++ b/weed/admin/maintenance/maintenance_integration.go @@ -493,3 +493,62 @@ func (s *MaintenanceIntegration) GetPendingOperations() *PendingOperations { func (s *MaintenanceIntegration) GetActiveTopology() *topology.ActiveTopology { return s.activeTopology } + +// SyncTask synchronizes a maintenance task with the active topology for capacity tracking +func (s *MaintenanceIntegration) SyncTask(task *MaintenanceTask) { + if s.activeTopology == nil { + return + } + + // Convert task type + taskType, exists := s.revTaskTypeMap[task.Type] + if !exists { + return + } + + // Convert status + var status topology.TaskStatus + switch task.Status { + case TaskStatusPending: + status = topology.TaskStatusPending + case TaskStatusAssigned, TaskStatusInProgress: + status = topology.TaskStatusInProgress + default: + return // Don't sync completed/failed/cancelled tasks + } + + // Extract sources and destinations from TypedParams + var sources []topology.TaskSource + var destinations []topology.TaskDestination + var estimatedSize int64 + + if task.TypedParams != nil { + // Use unified sources and targets from TaskParams + for _, src := range task.TypedParams.Sources { + sources = append(sources, topology.TaskSource{ + SourceServer: src.Node, + SourceDisk: src.DiskId, + }) + // Sum estimated size from all sources + estimatedSize += int64(src.EstimatedSize) + } + for _, target := range task.TypedParams.Targets { + destinations = append(destinations, topology.TaskDestination{ + TargetServer: target.Node, + TargetDisk: target.DiskId, + }) + } + + // Handle type-specific params for additional task-specific sync logic + if vacuumParams := task.TypedParams.GetVacuumParams(); vacuumParams != nil { + // TODO: Add vacuum-specific sync logic if necessary + } else if ecParams := task.TypedParams.GetErasureCodingParams(); ecParams != nil { + // TODO: Add EC-specific sync logic if necessary + } else if balanceParams := task.TypedParams.GetBalanceParams(); balanceParams != nil { + // TODO: Add balance-specific sync logic if necessary + } + } + + // Restore into topology + s.activeTopology.RestoreMaintenanceTask(task.ID, task.VolumeID, topology.TaskType(string(taskType)), status, sources, destinations, estimatedSize) +} diff --git a/weed/admin/maintenance/maintenance_manager.go b/weed/admin/maintenance/maintenance_manager.go index 69762c47c..9e0964188 100644 --- a/weed/admin/maintenance/maintenance_manager.go +++ b/weed/admin/maintenance/maintenance_manager.go @@ -558,10 +558,29 @@ func (mm *MaintenanceManager) UpdateConfig(config *MaintenanceConfig) error { mm.queue.policy = config.Policy mm.scanner.policy = config.Policy + // Propagate global policy changes to individual task configuration files + if config.Policy != nil { + mm.saveTaskConfigsFromPolicy(config.Policy) + } + glog.V(1).Infof("Maintenance configuration updated") return nil } +// saveTaskConfigsFromPolicy propagates global policy settings to separate task configuration files +func (mm *MaintenanceManager) saveTaskConfigsFromPolicy(policy *worker_pb.MaintenancePolicy) { + if mm.queue.persistence == nil || policy == nil { + return + } + + glog.V(1).Infof("Propagating maintenance policy changes to separate task configs") + for taskType, taskPolicy := range policy.TaskPolicies { + if err := mm.queue.persistence.SaveTaskPolicy(taskType, taskPolicy); err != nil { + glog.Errorf("Failed to save task policy for %s: %v", taskType, err) + } + } +} + // CancelTask cancels a pending task func (mm *MaintenanceManager) CancelTask(taskID string) error { mm.queue.mutex.Lock() diff --git a/weed/admin/maintenance/maintenance_types.go b/weed/admin/maintenance/maintenance_types.go index 7b7e83818..9970e12f1 100644 --- a/weed/admin/maintenance/maintenance_types.go +++ b/weed/admin/maintenance/maintenance_types.go @@ -180,6 +180,9 @@ type TaskPersistence interface { LoadAllTaskStates() ([]*MaintenanceTask, error) DeleteTaskState(taskID string) error CleanupCompletedTasks() error + + // Policy persistence + SaveTaskPolicy(taskType string, policy *TaskPolicy) error } // Default configuration values diff --git a/weed/admin/topology/capacity.go b/weed/admin/topology/capacity.go index a595ed369..4b5e20843 100644 --- a/weed/admin/topology/capacity.go +++ b/weed/admin/topology/capacity.go @@ -3,6 +3,7 @@ package topology import ( "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" ) @@ -83,6 +84,7 @@ func (at *ActiveTopology) GetDisksWithEffectiveCapacity(taskType TaskType, exclu var available []*DiskInfo + glog.V(2).Infof("GetDisksWithEffectiveCapacity checking %d disks for type %s, minCapacity %d", len(at.disks), taskType, minCapacity) for _, disk := range at.disks { if disk.NodeID == excludeNodeID { continue // Skip excluded node @@ -115,11 +117,24 @@ func (at *ActiveTopology) GetDisksWithEffectiveCapacity(taskType TaskType, exclu FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount, } diskCopy.DiskInfo = diskInfoCopy + diskCopy.DiskInfo.MaxVolumeCount = disk.DiskInfo.DiskInfo.MaxVolumeCount // Ensure Max is set available = append(available, &diskCopy) + } else { + glog.V(2).Infof("Disk %s:%d capacity %d < %d (Max:%d, Vol:%d)", disk.NodeID, disk.DiskInfo.DiskID, effectiveCapacity.VolumeSlots, minCapacity, disk.DiskInfo.DiskInfo.MaxVolumeCount, disk.DiskInfo.DiskInfo.VolumeCount) } + } else { + tasksInfo := "" + for _, t := range disk.pendingTasks { + tasksInfo += fmt.Sprintf("[P:%s,Vol:%d] ", t.TaskType, t.VolumeID) + } + for _, t := range disk.assignedTasks { + tasksInfo += fmt.Sprintf("[A:%s,Vol:%d] ", t.TaskType, t.VolumeID) + } + glog.V(2).Infof("Disk %s:%d unavailable. Load: %d, MaxLoad: %d. Tasks: %s", disk.NodeID, disk.DiskInfo.DiskID, len(disk.pendingTasks)+len(disk.assignedTasks), MaxConcurrentTasksPerDisk, tasksInfo) } } + glog.V(2).Infof("GetDisksWithEffectiveCapacity found %d available disks", len(available)) return available } diff --git a/weed/admin/topology/task_management.go b/weed/admin/topology/task_management.go index ada60248b..f56694a37 100644 --- a/weed/admin/topology/task_management.go +++ b/weed/admin/topology/task_management.go @@ -195,12 +195,67 @@ func (at *ActiveTopology) AddPendingTask(spec TaskSpec) error { at.pendingTasks[spec.TaskID] = task at.assignTaskToDisk(task) - glog.V(2).Infof("Added pending %s task %s: volume %d, %d sources, %d destinations", - spec.TaskType, spec.TaskID, spec.VolumeID, len(sources), len(destinations)) + return nil +} + +// RestoreMaintenanceTask restores a task from persistent storage into the active topology +func (at *ActiveTopology) RestoreMaintenanceTask(taskID string, volumeID uint32, taskType TaskType, status TaskStatus, sources []TaskSource, destinations []TaskDestination, estimatedSize int64) error { + at.mutex.Lock() + defer at.mutex.Unlock() + + task := &taskState{ + VolumeID: volumeID, + TaskType: taskType, + Status: status, + StartedAt: time.Now(), // Fallback if not provided, will be updated by heartbeats + EstimatedSize: estimatedSize, + Sources: sources, + Destinations: destinations, + } + + if status == TaskStatusInProgress { + at.assignedTasks[taskID] = task + } else if status == TaskStatusPending { + at.pendingTasks[taskID] = task + } else { + return nil // Ignore other statuses for topology tracking + } + + // Re-register task with disks for capacity tracking + at.assignTaskToDisk(task) + + glog.V(1).Infof("Restored %s task %s in topology: volume %d, %d sources, %d destinations", + taskType, taskID, volumeID, len(sources), len(destinations)) return nil } +// HasTask checks if there is any pending or assigned task for the given volume and task type. +// If taskType is TaskTypeNone, it checks for ANY task type. +func (at *ActiveTopology) HasTask(volumeID uint32, taskType TaskType) bool { + at.mutex.RLock() + defer at.mutex.RUnlock() + + for _, task := range at.pendingTasks { + if task.VolumeID == volumeID && (taskType == TaskTypeNone || task.TaskType == taskType) { + return true + } + } + + for _, task := range at.assignedTasks { + if task.VolumeID == volumeID && (taskType == TaskTypeNone || task.TaskType == taskType) { + return true + } + } + + return false +} + +// HasAnyTask checks if there is any pending or assigned task for the given volume across all types. +func (at *ActiveTopology) HasAnyTask(volumeID uint32) bool { + return at.HasTask(volumeID, TaskTypeNone) +} + // calculateSourceStorageImpact calculates storage impact for sources based on task type and cleanup type func (at *ActiveTopology) calculateSourceStorageImpact(taskType TaskType, cleanupType SourceCleanupType, volumeSize int64) StorageSlotChange { switch taskType { diff --git a/weed/admin/topology/types.go b/weed/admin/topology/types.go index df0103529..4b76de69f 100644 --- a/weed/admin/topology/types.go +++ b/weed/admin/topology/types.go @@ -10,6 +10,7 @@ type TaskStatus string // Common task type constants const ( + TaskTypeNone TaskType = "" TaskTypeVacuum TaskType = "vacuum" TaskTypeBalance TaskType = "balance" TaskTypeErasureCoding TaskType = "erasure_coding" @@ -27,11 +28,11 @@ const ( const ( // MaxConcurrentTasksPerDisk defines the maximum number of concurrent tasks per disk // This prevents overloading a single disk with too many simultaneous operations - MaxConcurrentTasksPerDisk = 2 + MaxConcurrentTasksPerDisk = 10 // MaxTotalTaskLoadPerDisk defines the maximum total task load (pending + active) per disk // This allows more tasks to be queued but limits the total pipeline depth - MaxTotalTaskLoadPerDisk = 3 + MaxTotalTaskLoadPerDisk = 20 // MaxTaskLoadForECPlacement defines the maximum task load to consider a disk for EC placement // This threshold ensures disks aren't overloaded when planning EC operations diff --git a/weed/admin/view/components/form_fields.templ b/weed/admin/view/components/form_fields.templ index 82d20d407..a2c79c9a7 100644 --- a/weed/admin/view/components/form_fields.templ +++ b/weed/admin/view/components/form_fields.templ @@ -236,6 +236,14 @@ templ DurationInputField(data DurationInputFieldData) { name={ data.Name + "_unit" } style="max-width: 120px;" > + ") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 97, ">Hours ") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } if data.Description != "" { - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 98, "
") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 100, "
") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } var templ_7745c5c3_Var49 string templ_7745c5c3_Var49, templ_7745c5c3_Err = templ.JoinStringErrs(data.Description) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/components/form_fields.templ`, Line: 266, Col: 55} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/components/form_fields.templ`, Line: 274, Col: 55} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var49)) if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 99, "
") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 101, "
") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } } - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 100, "") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 102, "") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } @@ -1100,6 +1110,11 @@ func getIntDisplayUnit(seconds int) string { return "minutes" } + // Preserve seconds when not divisible by minutes + if seconds < 60 || seconds%60 != 0 { + return "seconds" + } + // Check if it's evenly divisible by days if seconds%(24*3600) == 0 { return "days" @@ -1119,6 +1134,11 @@ func convertSecondsToUnit(seconds int) string { return "minutes" } + // Preserve seconds when not divisible by minutes + if seconds < 60 || seconds%60 != 0 { + return "seconds" + } + // Try days first if seconds%(24*3600) == 0 && seconds >= 24*3600 { return "days" @@ -1145,6 +1165,8 @@ func convertSecondsToValue(seconds int, unit string) float64 { return float64(seconds / 3600) case "minutes": return float64(seconds / 60) + case "seconds": + return float64(seconds) default: return float64(seconds / 60) // Default to minutes } @@ -1178,181 +1200,191 @@ func IntervalField(data IntervalFieldData) templ.Component { templ_7745c5c3_Var50 = templ.NopComponent } ctx = templ.ClearChildren(ctx) - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 101, "
") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 127, "") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } diff --git a/weed/worker/client.go b/weed/worker/client.go index 812308636..398172498 100644 --- a/weed/worker/client.go +++ b/weed/worker/client.go @@ -832,8 +832,8 @@ func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.Task } // Wait for task assignment - glog.V(3).Infof("WAITING FOR RESPONSE: Worker %s waiting for task assignment response (5s timeout)", workerID) - timeout := time.NewTimer(5 * time.Second) + glog.V(3).Infof("WAITING FOR RESPONSE: Worker %s waiting for task assignment response (30s timeout)", workerID) + timeout := time.NewTimer(30 * time.Second) defer timeout.Stop() for { @@ -841,6 +841,12 @@ func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.Task case response := <-c.incoming: glog.V(3).Infof("RESPONSE RECEIVED: Worker %s received response from admin server: %T", workerID, response.Message) if taskAssign := response.GetTaskAssignment(); taskAssign != nil { + // Validate TaskId is not empty before processing + if taskAssign.TaskId == "" { + glog.Warningf("Worker %s received task assignment with empty TaskId, ignoring", workerID) + continue + } + glog.V(1).Infof("Worker %s received task assignment in response: %s (type: %s, volume: %d)", workerID, taskAssign.TaskId, taskAssign.TaskType, taskAssign.Params.VolumeId) @@ -862,7 +868,7 @@ func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.Task glog.V(3).Infof("NON-TASK RESPONSE: Worker %s received non-task response: %T", workerID, response.Message) } case <-timeout.C: - glog.V(3).Infof("TASK REQUEST TIMEOUT: Worker %s - no task assignment received within 5 seconds", workerID) + glog.V(3).Infof("TASK REQUEST TIMEOUT: Worker %s - no task assignment received within 30 seconds", workerID) return nil, nil // No task available } } diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index c9f4ebfd5..40950b3be 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -100,6 +100,12 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI // Plan destination if ActiveTopology is available if clusterInfo.ActiveTopology != nil { + // Check if ANY task already exists in ActiveTopology for this volume + if clusterInfo.ActiveTopology.HasAnyTask(selectedVolume.VolumeID) { + glog.V(2).Infof("BALANCE: Skipping volume %d, task already exists in ActiveTopology", selectedVolume.VolumeID) + return nil, nil + } + destinationPlan, err := planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume) if err != nil { glog.Warningf("Failed to plan balance destination for volume %d: %v", selectedVolume.VolumeID, err) diff --git a/weed/worker/tasks/erasure_coding/config.go b/weed/worker/tasks/erasure_coding/config.go index 1f70fb8db..b649f065f 100644 --- a/weed/worker/tasks/erasure_coding/config.go +++ b/weed/worker/tasks/erasure_coding/config.go @@ -86,7 +86,7 @@ func GetConfigSpec() base.ConfigSpec { JSONName: "quiet_for_seconds", Type: config.FieldTypeInterval, DefaultValue: 300, - MinValue: 60, + MinValue: 1, MaxValue: 3600, Required: true, DisplayName: "Quiet Period", @@ -102,7 +102,7 @@ func GetConfigSpec() base.ConfigSpec { JSONName: "fullness_ratio", Type: config.FieldTypeFloat, DefaultValue: 0.8, - MinValue: 0.1, + MinValue: 0.0001, MaxValue: 1.0, Required: true, DisplayName: "Fullness Ratio", diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index f1ac3f626..88f8a017a 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -34,7 +34,22 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI skippedQuietTime := 0 skippedFullness := 0 + // Group metrics by VolumeID to handle replicas and select canonical server + volumeGroups := make(map[uint32][]*types.VolumeHealthMetrics) for _, metric := range metrics { + volumeGroups[metric.VolumeID] = append(volumeGroups[metric.VolumeID], metric) + } + + // Iterate over groups to check criteria and creation tasks + for _, groupMetrics := range volumeGroups { + // Find canonical metric (lowest Server ID) to ensure consistent task deduplication + metric := groupMetrics[0] + for _, m := range groupMetrics { + if m.Server < metric.Server { + metric = m + } + } + // Skip if already EC volume if metric.IsECVolume { skippedAlreadyEC++ @@ -83,6 +98,12 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI // Plan EC destinations if ActiveTopology is available if clusterInfo.ActiveTopology != nil { + // Check if ANY task already exists in ActiveTopology for this volume + if clusterInfo.ActiveTopology.HasAnyTask(metric.VolumeID) { + glog.V(2).Infof("EC Detection: Skipping volume %d, task already exists in ActiveTopology", metric.VolumeID) + continue + } + glog.Infof("EC Detection: ActiveTopology available, planning destinations for volume %d", metric.VolumeID) multiPlan, err := planECDestinations(clusterInfo.ActiveTopology, metric, ecConfig) if err != nil { diff --git a/weed/worker/tasks/vacuum/detection.go b/weed/worker/tasks/vacuum/detection.go index da59a4a7f..cd0e44e69 100644 --- a/weed/worker/tasks/vacuum/detection.go +++ b/weed/worker/tasks/vacuum/detection.go @@ -47,6 +47,14 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI ScheduleAt: time.Now(), } + // Check if ANY task already exists in ActiveTopology for this volume + if clusterInfo != nil && clusterInfo.ActiveTopology != nil { + if clusterInfo.ActiveTopology.HasAnyTask(metric.VolumeID) { + glog.V(2).Infof("VACUUM: Skipping volume %d, task already exists in ActiveTopology", metric.VolumeID) + continue + } + } + // Create typed parameters for vacuum task result.TypedParams = createVacuumTaskParams(result, metric, vacuumConfig, clusterInfo) if result.TypedParams != nil { diff --git a/weed/worker/types/task_ui.go b/weed/worker/types/task_ui.go index 9294127a8..8a57e83be 100644 --- a/weed/worker/types/task_ui.go +++ b/weed/worker/types/task_ui.go @@ -12,6 +12,11 @@ func secondsToIntervalValueUnit(totalSeconds int) (int, string) { return 0, "minute" } + // Preserve seconds when not divisible by minutes + if totalSeconds < 60 || totalSeconds%60 != 0 { + return totalSeconds, "second" + } + // Check if it's evenly divisible by days if totalSeconds%(24*3600) == 0 { return totalSeconds / (24 * 3600), "day" @@ -35,6 +40,8 @@ func IntervalValueUnitToSeconds(value int, unit string) int { return value * 3600 case "minute": return value * 60 + case "second": + return value default: return value * 60 // Default to minutes } diff --git a/weed/worker/worker.go b/weed/worker/worker.go index 5c7ebfd86..e44234f33 100644 --- a/weed/worker/worker.go +++ b/weed/worker/worker.go @@ -947,6 +947,10 @@ func (w *Worker) processAdminMessage(message *worker_pb.AdminMessage) { w.handleTaskLogRequest(msg.TaskLogRequest) case *worker_pb.AdminMessage_TaskAssignment: taskAssign := msg.TaskAssignment + if taskAssign.TaskId == "" { + glog.V(1).Infof("Worker %s received empty task assignment, going to sleep", w.id) + return + } glog.V(1).Infof("Worker %s received direct task assignment %s (type: %s, volume: %d)", w.id, taskAssign.TaskId, taskAssign.TaskType, taskAssign.Params.VolumeId)