diff --git a/telemetry/test/integration.go b/telemetry/test/integration.go index 2b79bdbc6..463806f15 100644 --- a/telemetry/test/integration.go +++ b/telemetry/test/integration.go @@ -148,7 +148,7 @@ func waitForServer(url string, timeout time.Duration) bool { func testProtobufMarshaling() error { // Test protobuf marshaling/unmarshaling testData := &proto.TelemetryData{ - ClusterId: "test-cluster-12345", + TopologyId: "test-cluster-12345", Version: "test-3.45", Os: "linux/amd64", VolumeServerCount: 2, diff --git a/test/postgres/client.go b/test/postgres/client.go index 3bf1a0007..70c81b127 100644 --- a/test/postgres/client.go +++ b/test/postgres/client.go @@ -1,3 +1,6 @@ +//go:build ignore +// +build ignore + package main import ( @@ -11,9 +14,22 @@ import ( _ "github.com/lib/pq" ) +const defaultPostgresHost = "postgres-server" + +func getEnv(key, def string) string { + if value, exists := os.LookupEnv(key); exists { + return value + } + return def +} + func main() { + clientMain() +} + +func clientMain() { // Get PostgreSQL connection details from environment - host := getEnv("POSTGRES_HOST", "localhost") + host := getEnv("POSTGRES_HOST", defaultPostgresHost) port := getEnv("POSTGRES_PORT", "5432") user := getEnv("POSTGRES_USER", "seaweedfs") dbname := getEnv("POSTGRES_DB", "default") @@ -102,7 +118,7 @@ func testSystemInfo(db *sql.DB) error { } // Use individual connections for each query to avoid protocol issues - connStr := getEnv("POSTGRES_HOST", "postgres-server") + connStr := getEnv("POSTGRES_HOST", defaultPostgresHost) port := getEnv("POSTGRES_PORT", "5432") user := getEnv("POSTGRES_USER", "seaweedfs") dbname := getEnv("POSTGRES_DB", "logs") @@ -118,12 +134,12 @@ func testSystemInfo(db *sql.DB) error { log.Printf(" Query '%s' failed to connect: %v", q.query, err) continue } - defer tempDB.Close() var result string err = tempDB.QueryRow(q.query).Scan(&result) if err != nil { log.Printf(" Query '%s' failed: %v", q.query, err) + tempDB.Close() continue } log.Printf(" %s: %s", q.name, result) @@ -313,7 +329,7 @@ func testDatabaseSwitching(db *sql.DB) error { databases := []string{"analytics", "ecommerce", "logs"} // Use fresh connections to avoid protocol issues - connStr := getEnv("POSTGRES_HOST", "postgres-server") + connStr := getEnv("POSTGRES_HOST", defaultPostgresHost) port := getEnv("POSTGRES_PORT", "5432") user := getEnv("POSTGRES_USER", "seaweedfs") @@ -328,7 +344,6 @@ func testDatabaseSwitching(db *sql.DB) error { log.Printf(" Could not connect to '%s': %v", dbName, err) continue } - defer tempDB.Close() // Test the connection by executing a simple query var newDB string @@ -372,7 +387,7 @@ func testSystemColumns(db *sql.DB) error { // Use fresh connection to avoid protocol state issues connStr := fmt.Sprintf("host=%s port=%s user=%s dbname=%s sslmode=disable", - getEnv("POSTGRES_HOST", "postgres-server"), + getEnv("POSTGRES_HOST", defaultPostgresHost), getEnv("POSTGRES_PORT", "5432"), getEnv("POSTGRES_USER", "seaweedfs"), getEnv("POSTGRES_DB", "logs")) @@ -382,7 +397,6 @@ func testSystemColumns(db *sql.DB) error { log.Printf(" Could not create connection: %v", err) continue } - defer tempDB.Close() // First check if table exists and has data (safer than COUNT which was causing crashes) rows, err := tempDB.Query(fmt.Sprintf("SELECT id FROM %s LIMIT 1", table)) @@ -444,7 +458,6 @@ func testComplexQueries(db *sql.DB) error { log.Printf(" Could not create connection: %v", err) continue } - defer tempDB.Close() // Test basic SELECT with LIMIT (avoid COUNT which was causing crashes) rows, err := tempDB.Query(fmt.Sprintf("SELECT id FROM %s LIMIT 5", table)) @@ -469,38 +482,36 @@ func testComplexQueries(db *sql.DB) error { // Test WHERE clause with known ID (safer than arbitrary conditions) testID := ids[0] rows, err = tempDB.Query(fmt.Sprintf("SELECT id FROM %s WHERE id = %d", table, testID)) - if err == nil { + if err != nil { + log.Printf(" WHERE query failed: %v", err) + tempDB.Close() + return fmt.Errorf("WHERE clause test failed for table '%s'", table) + } + + foundMatch := false + if rows.Next() { var foundID int64 - if rows.Next() { - if err := rows.Scan(&foundID); err == nil && foundID == testID { - log.Printf(" ✓ WHERE clause working: found record with ID %d", foundID) - } + if err := rows.Scan(&foundID); err == nil && foundID == testID { + log.Printf(" ✓ WHERE clause working: found record with ID %d", foundID) + foundMatch = true } - rows.Close() + } + rows.Close() + + if !foundMatch { + tempDB.Close() + return fmt.Errorf("WHERE clause test failed: no matching record found for ID %d in table '%s'", testID, table) } log.Printf(" ✓ Complex queries test passed for '%s'", table) tempDB.Close() return nil + } else { + tempDB.Close() + return fmt.Errorf("no records found in table '%s' - cannot test WHERE clause", table) } - - tempDB.Close() } log.Println(" Complex queries test completed - avoided crash-prone patterns") return nil } - -func stringOrNull(ns sql.NullString) string { - if ns.Valid { - return ns.String - } - return "NULL" -} - -func getEnv(key, defaultValue string) string { - if value, exists := os.LookupEnv(key); exists { - return value - } - return defaultValue -} diff --git a/test/postgres/producer.go b/test/postgres/producer.go index 2d49519e8..0d7b9763f 100644 --- a/test/postgres/producer.go +++ b/test/postgres/producer.go @@ -322,7 +322,7 @@ func discoverFiler(masterHTTPAddress string) (string, error) { // Use the first available filer and convert HTTP address to gRPC filerHTTPAddress := resp.ClusterNodes[0].Address - httpAddr := pb.ServerAddress(filerHTTPAddress) + httpAddr = pb.ServerAddress(filerHTTPAddress) return httpAddr.ToGrpcAddress(), nil } diff --git a/test/volume_server/grpc/move_tail_timestamp_test.go b/test/volume_server/grpc/move_tail_timestamp_test.go index 416235133..8d5e01a47 100644 --- a/test/volume_server/grpc/move_tail_timestamp_test.go +++ b/test/volume_server/grpc/move_tail_timestamp_test.go @@ -210,6 +210,7 @@ func TestVolumeMoveHandlesInFlightWrites(t *testing.T) { var writesMu sync.Mutex var writes []written + var uploadErr error writeCtx, writeCancel := context.WithCancel(context.Background()) var writerWG sync.WaitGroup writerWG.Add(1) @@ -227,7 +228,8 @@ func TestVolumeMoveHandlesInFlightWrites(t *testing.T) { resp := framework.UploadBytes(t, client, cluster.VolumeAdminURL(0), liveFid, livePayload) _ = framework.ReadAllAndClose(t, resp) if resp.StatusCode != http.StatusCreated { - t.Fatalf("live upload failed: %d", resp.StatusCode) + uploadErr = fmt.Errorf("live upload failed: %d", resp.StatusCode) + return } writesMu.Lock() writes = append(writes, written{fid: liveFid, data: livePayload}) @@ -253,6 +255,10 @@ func TestVolumeMoveHandlesInFlightWrites(t *testing.T) { writeCancel() writerWG.Wait() + if uploadErr != nil { + t.Fatalf("upload goroutine error: %v", uploadErr) + } + writesMu.Lock() sampleCount := len(writes) if sampleCount == 0 { diff --git a/weed/command/mini.go b/weed/command/mini.go index 051d319da..95cc2f0aa 100644 --- a/weed/command/mini.go +++ b/weed/command/mini.go @@ -1186,7 +1186,7 @@ func startMiniPluginWorker(ctx context.Context) { util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.worker") - handlers, err := buildPluginWorkerHandlers(defaultMiniPluginJobTypes, grpcDialOption, int(pluginworker.DefaultMaxExecutionConcurrency)) + handlers, err := buildPluginWorkerHandlers(defaultMiniPluginJobTypes, grpcDialOption, int(pluginworker.DefaultMaxExecutionConcurrency), workerDir) if err != nil { glog.Fatalf("Failed to build mini plugin worker handlers: %v", err) } diff --git a/weed/command/plugin_worker_test.go b/weed/command/plugin_worker_test.go index 56f4f38c4..ed4fe91f3 100644 --- a/weed/command/plugin_worker_test.go +++ b/weed/command/plugin_worker_test.go @@ -19,7 +19,7 @@ func TestBuildPluginWorkerHandler(t *testing.T) { testMaxConcurrency := int(pluginworker.DefaultMaxExecutionConcurrency) - handler, err := buildPluginWorkerHandler("vacuum", dialOption, testMaxConcurrency) + handler, err := buildPluginWorkerHandler("vacuum", dialOption, testMaxConcurrency, "") if err != nil { t.Fatalf("buildPluginWorkerHandler(vacuum) err = %v", err) } @@ -27,7 +27,7 @@ func TestBuildPluginWorkerHandler(t *testing.T) { t.Fatalf("expected non-nil handler") } - handler, err = buildPluginWorkerHandler("", dialOption, testMaxConcurrency) + handler, err = buildPluginWorkerHandler("", dialOption, testMaxConcurrency, "") if err != nil { t.Fatalf("buildPluginWorkerHandler(default) err = %v", err) } @@ -35,7 +35,7 @@ func TestBuildPluginWorkerHandler(t *testing.T) { t.Fatalf("expected non-nil default handler") } - handler, err = buildPluginWorkerHandler("volume_balance", dialOption, testMaxConcurrency) + handler, err = buildPluginWorkerHandler("volume_balance", dialOption, testMaxConcurrency, "") if err != nil { t.Fatalf("buildPluginWorkerHandler(volume_balance) err = %v", err) } @@ -43,7 +43,7 @@ func TestBuildPluginWorkerHandler(t *testing.T) { t.Fatalf("expected non-nil volume_balance handler") } - handler, err = buildPluginWorkerHandler("balance", dialOption, testMaxConcurrency) + handler, err = buildPluginWorkerHandler("balance", dialOption, testMaxConcurrency, "") if err != nil { t.Fatalf("buildPluginWorkerHandler(balance alias) err = %v", err) } @@ -51,7 +51,7 @@ func TestBuildPluginWorkerHandler(t *testing.T) { t.Fatalf("expected non-nil balance alias handler") } - handler, err = buildPluginWorkerHandler("erasure_coding", dialOption, testMaxConcurrency) + handler, err = buildPluginWorkerHandler("erasure_coding", dialOption, testMaxConcurrency, "") if err != nil { t.Fatalf("buildPluginWorkerHandler(erasure_coding) err = %v", err) } @@ -59,7 +59,7 @@ func TestBuildPluginWorkerHandler(t *testing.T) { t.Fatalf("expected non-nil erasure_coding handler") } - handler, err = buildPluginWorkerHandler("ec", dialOption, testMaxConcurrency) + handler, err = buildPluginWorkerHandler("ec", dialOption, testMaxConcurrency, "") if err != nil { t.Fatalf("buildPluginWorkerHandler(ec alias) err = %v", err) } @@ -67,7 +67,7 @@ func TestBuildPluginWorkerHandler(t *testing.T) { t.Fatalf("expected non-nil ec alias handler") } - _, err = buildPluginWorkerHandler("unknown", dialOption, testMaxConcurrency) + _, err = buildPluginWorkerHandler("unknown", dialOption, testMaxConcurrency, "") if err == nil { t.Fatalf("expected unsupported job type error") } @@ -78,7 +78,7 @@ func TestBuildPluginWorkerHandlers(t *testing.T) { testMaxConcurrency := int(pluginworker.DefaultMaxExecutionConcurrency) - handlers, err := buildPluginWorkerHandlers("vacuum,volume_balance,erasure_coding", dialOption, testMaxConcurrency) + handlers, err := buildPluginWorkerHandlers("vacuum,volume_balance,erasure_coding", dialOption, testMaxConcurrency, "") if err != nil { t.Fatalf("buildPluginWorkerHandlers(list) err = %v", err) } @@ -86,7 +86,7 @@ func TestBuildPluginWorkerHandlers(t *testing.T) { t.Fatalf("expected 3 handlers, got %d", len(handlers)) } - handlers, err = buildPluginWorkerHandlers("balance,ec,vacuum,balance", dialOption, testMaxConcurrency) + handlers, err = buildPluginWorkerHandlers("balance,ec,vacuum,balance", dialOption, testMaxConcurrency, "") if err != nil { t.Fatalf("buildPluginWorkerHandlers(aliases) err = %v", err) } @@ -94,7 +94,7 @@ func TestBuildPluginWorkerHandlers(t *testing.T) { t.Fatalf("expected deduped 3 handlers, got %d", len(handlers)) } - _, err = buildPluginWorkerHandlers("unknown,vacuum", dialOption, testMaxConcurrency) + _, err = buildPluginWorkerHandlers("unknown,vacuum", dialOption, testMaxConcurrency, "") if err == nil { t.Fatalf("expected unsupported job type error") } diff --git a/weed/command/worker_runtime.go b/weed/command/worker_runtime.go index 27c381d4a..180d35140 100644 --- a/weed/command/worker_runtime.go +++ b/weed/command/worker_runtime.go @@ -81,7 +81,7 @@ func runPluginWorkerWithOptions(options pluginWorkerRunOptions) bool { return false } - handlers, err := buildPluginWorkerHandlers(options.JobTypes, dialOption, options.MaxExecute) + handlers, err := buildPluginWorkerHandlers(options.JobTypes, dialOption, options.MaxExecute, options.WorkingDir) if err != nil { glog.Errorf("Failed to build plugin worker handlers: %v", err) return false @@ -163,7 +163,7 @@ func resolvePluginWorkerID(explicitID string, workingDir string) (string, error) // The scheduler's effective per-worker MaxExecutionConcurrency is derived from // the worker-level configuration (e.g. WorkerOptions.MaxExecutionConcurrency), // not directly from the handler's Capability. -func buildPluginWorkerHandler(jobType string, dialOption grpc.DialOption, maxExecute int) (pluginworker.JobHandler, error) { +func buildPluginWorkerHandler(jobType string, dialOption grpc.DialOption, maxExecute int, workingDir string) (pluginworker.JobHandler, error) { canonicalJobType, err := canonicalPluginWorkerJobType(jobType) if err != nil { return nil, err @@ -175,7 +175,7 @@ func buildPluginWorkerHandler(jobType string, dialOption grpc.DialOption, maxExe case "volume_balance": return pluginworker.NewVolumeBalanceHandler(dialOption), nil case "erasure_coding": - return pluginworker.NewErasureCodingHandler(dialOption), nil + return pluginworker.NewErasureCodingHandler(dialOption, workingDir), nil default: return nil, fmt.Errorf("unsupported plugin job type %q", canonicalJobType) } @@ -183,7 +183,7 @@ func buildPluginWorkerHandler(jobType string, dialOption grpc.DialOption, maxExe // buildPluginWorkerHandlers constructs a deduplicated slice of JobHandlers for // the comma-separated jobTypes string, forwarding maxExecute to each handler. -func buildPluginWorkerHandlers(jobTypes string, dialOption grpc.DialOption, maxExecute int) ([]pluginworker.JobHandler, error) { +func buildPluginWorkerHandlers(jobTypes string, dialOption grpc.DialOption, maxExecute int, workingDir string) ([]pluginworker.JobHandler, error) { parsedJobTypes, err := parsePluginWorkerJobTypes(jobTypes) if err != nil { return nil, err @@ -191,7 +191,7 @@ func buildPluginWorkerHandlers(jobTypes string, dialOption grpc.DialOption, maxE handlers := make([]pluginworker.JobHandler, 0, len(parsedJobTypes)) for _, jobType := range parsedJobTypes { - handler, buildErr := buildPluginWorkerHandler(jobType, dialOption, maxExecute) + handler, buildErr := buildPluginWorkerHandler(jobType, dialOption, maxExecute, workingDir) if buildErr != nil { return nil, buildErr } diff --git a/weed/plugin/worker/erasure_coding_handler.go b/weed/plugin/worker/erasure_coding_handler.go index 70ec51be8..0d72aef3f 100644 --- a/weed/plugin/worker/erasure_coding_handler.go +++ b/weed/plugin/worker/erasure_coding_handler.go @@ -3,7 +3,6 @@ package pluginworker import ( "context" "fmt" - "os" "path/filepath" "strings" "time" @@ -27,10 +26,11 @@ type erasureCodingWorkerConfig struct { // ErasureCodingHandler is the plugin job handler for erasure coding. type ErasureCodingHandler struct { grpcDialOption grpc.DialOption + workingDir string } -func NewErasureCodingHandler(grpcDialOption grpc.DialOption) *ErasureCodingHandler { - return &ErasureCodingHandler{grpcDialOption: grpcDialOption} +func NewErasureCodingHandler(grpcDialOption grpc.DialOption, workingDir string) *ErasureCodingHandler { + return &ErasureCodingHandler{grpcDialOption: grpcDialOption, workingDir: strings.TrimSpace(workingDir)} } func (h *ErasureCodingHandler) Capability() *plugin_pb.JobTypeCapability { @@ -228,24 +228,21 @@ func (h *ErasureCodingHandler) Detect( } clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology} - results, err := erasurecodingtask.Detection(metrics, clusterInfo, workerConfig.TaskConfig) + maxResults := int(request.MaxResults) + if maxResults < 0 { + maxResults = 0 + } + results, hasMore, err := erasurecodingtask.Detection(ctx, metrics, clusterInfo, workerConfig.TaskConfig, maxResults) if err != nil { return err } - if traceErr := emitErasureCodingDetectionDecisionTrace(sender, metrics, workerConfig.TaskConfig, results); traceErr != nil { + if traceErr := emitErasureCodingDetectionDecisionTrace(sender, metrics, workerConfig.TaskConfig, results, maxResults, hasMore); traceErr != nil { glog.Warningf("Plugin worker failed to emit erasure_coding detection trace: %v", traceErr) } - maxResults := int(request.MaxResults) - hasMore := false - if maxResults > 0 && len(results) > maxResults { - hasMore = true - results = results[:maxResults] - } - proposals := make([]*plugin_pb.JobProposal, 0, len(results)) for _, result := range results { - proposal, proposalErr := buildErasureCodingProposal(result) + proposal, proposalErr := buildErasureCodingProposal(result, h.workingDir) if proposalErr != nil { glog.Warningf("Plugin worker skip invalid erasure_coding proposal: %v", proposalErr) continue @@ -273,6 +270,8 @@ func emitErasureCodingDetectionDecisionTrace( metrics []*workertypes.VolumeHealthMetrics, taskConfig *erasurecodingtask.Config, results []*workertypes.TaskDetectionResult, + maxResults int, + hasMore bool, ) error { if sender == nil || taskConfig == nil { return nil @@ -280,15 +279,7 @@ func emitErasureCodingDetectionDecisionTrace( quietThreshold := time.Duration(taskConfig.QuietForSeconds) * time.Second minSizeBytes := uint64(taskConfig.MinSizeMB) * 1024 * 1024 - allowedCollections := make(map[string]bool) - if strings.TrimSpace(taskConfig.CollectionFilter) != "" { - for _, collection := range strings.Split(taskConfig.CollectionFilter, ",") { - trimmed := strings.TrimSpace(collection) - if trimmed != "" { - allowedCollections[trimmed] = true - } - } - } + allowedCollections := erasurecodingtask.ParseCollectionFilter(taskConfig.CollectionFilter) volumeGroups := make(map[uint32][]*workertypes.VolumeHealthMetrics) for _, metric := range metrics { @@ -341,11 +332,16 @@ func emitErasureCodingDetectionDecisionTrace( } totalVolumes := len(metrics) + summarySuffix := "" + if hasMore { + summarySuffix = fmt.Sprintf(" (max_results=%d reached; remaining volumes not evaluated)", maxResults) + } summaryMessage := "" if len(results) == 0 { summaryMessage = fmt.Sprintf( - "EC detection: No tasks created for %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)", + "EC detection: No tasks created for %d volumes%s (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)", totalVolumes, + summarySuffix, skippedAlreadyEC, skippedTooSmall, skippedCollectionFilter, @@ -354,8 +350,9 @@ func emitErasureCodingDetectionDecisionTrace( ) } else { summaryMessage = fmt.Sprintf( - "EC detection: Created %d task(s) from %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)", + "EC detection: Created %d task(s)%s from %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)", len(results), + summarySuffix, totalVolumes, skippedAlreadyEC, skippedTooSmall, @@ -372,6 +369,12 @@ func emitErasureCodingDetectionDecisionTrace( "selected_tasks": { Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(results))}, }, + "max_results": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(maxResults)}, + }, + "has_more": { + Kind: &plugin_pb.ConfigValue_BoolValue{BoolValue: hasMore}, + }, "skipped_already_ec": { Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedAlreadyEC)}, }, @@ -470,7 +473,7 @@ func (h *ErasureCodingHandler) Execute( return err } - applyErasureCodingExecutionDefaults(params, request.GetClusterContext()) + applyErasureCodingExecutionDefaults(params, request.GetClusterContext(), h.workingDir) if len(params.Sources) == 0 || strings.TrimSpace(params.Sources[0].Node) == "" { return fmt.Errorf("erasure coding source node is required") @@ -607,6 +610,7 @@ func deriveErasureCodingWorkerConfig(values map[string]*plugin_pb.ConfigValue) * func buildErasureCodingProposal( result *workertypes.TaskDetectionResult, + baseWorkingDir string, ) (*plugin_pb.JobProposal, error) { if result == nil { return nil, fmt.Errorf("task detection result is nil") @@ -615,7 +619,7 @@ func buildErasureCodingProposal( return nil, fmt.Errorf("missing typed params for volume %d", result.VolumeID) } params := proto.Clone(result.TypedParams).(*worker_pb.TaskParams) - applyErasureCodingExecutionDefaults(params, nil) + applyErasureCodingExecutionDefaults(params, nil, baseWorkingDir) paramsPayload, err := proto.Marshal(params) if err != nil { @@ -766,6 +770,7 @@ func decodeErasureCodingTaskParams(job *plugin_pb.JobSpec) (*worker_pb.TaskParam func applyErasureCodingExecutionDefaults( params *worker_pb.TaskParams, clusterContext *plugin_pb.ClusterContext, + baseWorkingDir string, ) { if params == nil { return @@ -786,7 +791,7 @@ func applyErasureCodingExecutionDefaults( if ecParams.ParityShards <= 0 { ecParams.ParityShards = ecstorage.ParityShardsCount } - ecParams.WorkingDir = defaultErasureCodingWorkingDir() + ecParams.WorkingDir = defaultErasureCodingWorkingDir(baseWorkingDir) ecParams.CleanupSource = true if strings.TrimSpace(ecParams.MasterClient) == "" && clusterContext != nil && len(clusterContext.MasterGrpcAddresses) > 0 { ecParams.MasterClient = clusterContext.MasterGrpcAddresses[0] @@ -897,6 +902,10 @@ func assignECShardIDs(totalShards int, targetCount int) [][]uint32 { return assignments } -func defaultErasureCodingWorkingDir() string { - return filepath.Join(os.TempDir(), "seaweedfs-ec") +func defaultErasureCodingWorkingDir(baseWorkingDir string) string { + dir := strings.TrimSpace(baseWorkingDir) + if dir == "" { + return filepath.Join(".", "seaweedfs-ec") + } + return filepath.Join(dir, "seaweedfs-ec") } diff --git a/weed/plugin/worker/erasure_coding_handler_test.go b/weed/plugin/worker/erasure_coding_handler_test.go index 28342c135..5b5e147c7 100644 --- a/weed/plugin/worker/erasure_coding_handler_test.go +++ b/weed/plugin/worker/erasure_coding_handler_test.go @@ -179,7 +179,7 @@ func TestBuildErasureCodingProposal(t *testing.T) { TypedParams: params, } - proposal, err := buildErasureCodingProposal(result) + proposal, err := buildErasureCodingProposal(result, "") if err != nil { t.Fatalf("buildErasureCodingProposal() err = %v", err) } @@ -195,7 +195,7 @@ func TestBuildErasureCodingProposal(t *testing.T) { } func TestErasureCodingHandlerRejectsUnsupportedJobType(t *testing.T) { - handler := NewErasureCodingHandler(nil) + handler := NewErasureCodingHandler(nil, "") err := handler.Detect(context.Background(), &plugin_pb.RunDetectionRequest{ JobType: "vacuum", }, noopDetectionSender{}) @@ -212,7 +212,7 @@ func TestErasureCodingHandlerRejectsUnsupportedJobType(t *testing.T) { } func TestErasureCodingHandlerDetectSkipsByMinInterval(t *testing.T) { - handler := NewErasureCodingHandler(nil) + handler := NewErasureCodingHandler(nil, "") sender := &recordingDetectionSender{} err := handler.Detect(context.Background(), &plugin_pb.RunDetectionRequest{ JobType: "erasure_coding", @@ -269,7 +269,7 @@ func TestEmitErasureCodingDetectionDecisionTraceNoTasks(t *testing.T) { }, } - if err := emitErasureCodingDetectionDecisionTrace(sender, metrics, config, nil); err != nil { + if err := emitErasureCodingDetectionDecisionTrace(sender, metrics, config, nil, 0, false); err != nil { t.Fatalf("emitErasureCodingDetectionDecisionTrace error: %v", err) } if len(sender.events) < 4 { @@ -288,7 +288,7 @@ func TestEmitErasureCodingDetectionDecisionTraceNoTasks(t *testing.T) { } func TestErasureCodingDescriptorOmitsLocalExecutionFields(t *testing.T) { - descriptor := NewErasureCodingHandler(nil).Descriptor() + descriptor := NewErasureCodingHandler(nil, "").Descriptor() if descriptor == nil || descriptor.WorkerConfigForm == nil { t.Fatalf("expected worker config form in descriptor") } @@ -301,6 +301,7 @@ func TestErasureCodingDescriptorOmitsLocalExecutionFields(t *testing.T) { } func TestApplyErasureCodingExecutionDefaultsForcesLocalFields(t *testing.T) { + baseWorkingDir := "/var/lib/seaweedfs-worker" params := &worker_pb.TaskParams{ TaskId: "ec-test", VolumeId: 100, @@ -314,14 +315,14 @@ func TestApplyErasureCodingExecutionDefaultsForcesLocalFields(t *testing.T) { }, } - applyErasureCodingExecutionDefaults(params, nil) + applyErasureCodingExecutionDefaults(params, nil, baseWorkingDir) ecParams := params.GetErasureCodingParams() if ecParams == nil { t.Fatalf("expected erasure coding params") } - if ecParams.WorkingDir != defaultErasureCodingWorkingDir() { - t.Fatalf("expected local working_dir %q, got %q", defaultErasureCodingWorkingDir(), ecParams.WorkingDir) + if ecParams.WorkingDir != defaultErasureCodingWorkingDir(baseWorkingDir) { + t.Fatalf("expected local working_dir %q, got %q", defaultErasureCodingWorkingDir(baseWorkingDir), ecParams.WorkingDir) } if !ecParams.CleanupSource { t.Fatalf("expected cleanup_source true") diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index 2bc61469c..7ea199403 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -1,7 +1,9 @@ package erasure_coding import ( + "context" "fmt" + "sort" "strings" "time" @@ -15,14 +17,21 @@ import ( "github.com/seaweedfs/seaweedfs/weed/worker/types" ) -// Detection implements the detection logic for erasure coding tasks -func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { +// Detection implements the detection logic for erasure coding tasks. +// It respects ctx cancellation and can stop early once maxResults is reached. +func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig, maxResults int) ([]*types.TaskDetectionResult, bool, error) { if !config.IsEnabled() { - return nil, nil + return nil, false, nil + } + + if maxResults < 0 { + maxResults = 0 } ecConfig := config.(*Config) var results []*types.TaskDetectionResult + hasMore := false + stoppedEarly := false now := time.Now() quietThreshold := time.Duration(ecConfig.QuietForSeconds) * time.Second minSizeBytes := uint64(ecConfig.MinSizeMB) * 1024 * 1024 // Configurable minimum @@ -34,14 +43,44 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI skippedQuietTime := 0 skippedFullness := 0 + var planner *ecPlacementPlanner + + allowedCollections := ParseCollectionFilter(ecConfig.CollectionFilter) + // Group metrics by VolumeID to handle replicas and select canonical server volumeGroups := make(map[uint32][]*types.VolumeHealthMetrics) for _, metric := range metrics { + if ctx != nil { + if err := ctx.Err(); err != nil { + return results, hasMore, err + } + } volumeGroups[metric.VolumeID] = append(volumeGroups[metric.VolumeID], metric) } + groupKeys := make([]uint32, 0, len(volumeGroups)) + for volumeID := range volumeGroups { + groupKeys = append(groupKeys, volumeID) + } + sort.Slice(groupKeys, func(i, j int) bool { return groupKeys[i] < groupKeys[j] }) + // Iterate over groups to check criteria and creation tasks - for _, groupMetrics := range volumeGroups { + for idx, volumeID := range groupKeys { + if ctx != nil { + if err := ctx.Err(); err != nil { + return results, hasMore, err + } + } + if maxResults > 0 && len(results) >= maxResults { + if idx+1 < len(groupKeys) { + hasMore = true + } + stoppedEarly = true + break + } + + groupMetrics := volumeGroups[volumeID] + // Find canonical metric (lowest Server ID) to ensure consistent task deduplication metric := groupMetrics[0] for _, m := range groupMetrics { @@ -63,12 +102,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI } // Check collection filter if specified - if ecConfig.CollectionFilter != "" { - // Parse comma-separated collections - allowedCollections := make(map[string]bool) - for _, collection := range strings.Split(ecConfig.CollectionFilter, ",") { - allowedCollections[strings.TrimSpace(collection)] = true - } + if len(allowedCollections) > 0 { // Skip if volume's collection is not in the allowed list if !allowedCollections[metric.Collection] { skippedCollectionFilter++ @@ -78,6 +112,11 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI // Check quiet duration and fullness criteria if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio { + if ctx != nil { + if err := ctx.Err(); err != nil { + return results, hasMore, err + } + } glog.Infof("EC Detection: Volume %d meets all criteria, attempting to create task", metric.VolumeID) // Generate task ID for ActiveTopology integration @@ -105,7 +144,10 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI } glog.Infof("EC Detection: ActiveTopology available, planning destinations for volume %d", metric.VolumeID) - multiPlan, err := planECDestinations(clusterInfo.ActiveTopology, metric, ecConfig) + if planner == nil { + planner = newECPlacementPlanner(clusterInfo.ActiveTopology) + } + multiPlan, err := planECDestinations(planner, metric, ecConfig) if err != nil { glog.Warningf("Failed to plan EC destinations for volume %d: %v", metric.VolumeID, err) continue // Skip this volume if destination planning fails @@ -189,6 +231,13 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI } } + // Convert sources before mutating topology + sourcesProto, err := convertTaskSourcesToProtobuf(sources, metric.VolumeID, clusterInfo.ActiveTopology) + if err != nil { + glog.Warningf("Failed to convert sources for EC task on volume %d: %v, skipping", metric.VolumeID, err) + continue + } + err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{ TaskID: taskID, TaskType: topology.TaskTypeErasureCoding, @@ -202,16 +251,13 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI continue // Skip this volume if topology task addition fails } + if planner != nil { + planner.applyTaskReservations(int64(metric.Size), sources, destinations) + } + glog.V(2).Infof("Added pending EC shard task %s to ActiveTopology for volume %d with %d cleanup sources and %d shard destinations", taskID, metric.VolumeID, len(sources), len(multiPlan.Plans)) - // Convert sources - sourcesProto, err := convertTaskSourcesToProtobuf(sources, metric.VolumeID, clusterInfo.ActiveTopology) - if err != nil { - glog.Warningf("Failed to convert sources for EC task on volume %d: %v, skipping", metric.VolumeID, err) - continue - } - // Create unified sources and targets for EC task result.TypedParams = &worker_pb.TaskParams{ TaskId: taskID, // Link to ActiveTopology pending task @@ -256,7 +302,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI } // Log debug summary if no tasks were created - if len(results) == 0 && len(metrics) > 0 { + if len(results) == 0 && len(metrics) > 0 && !stoppedEarly { totalVolumes := len(metrics) glog.Infof("EC detection: No tasks created for %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)", totalVolumes, skippedAlreadyEC, skippedTooSmall, skippedCollectionFilter, skippedQuietTime, skippedFullness) @@ -273,12 +319,197 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI } } - return results, nil + return results, hasMore, nil +} + +func ParseCollectionFilter(filter string) map[string]bool { + allowed := make(map[string]bool) + for _, collection := range strings.Split(filter, ",") { + trimmed := strings.TrimSpace(collection) + if trimmed != "" { + allowed[trimmed] = true + } + } + if len(allowed) == 0 { + return nil + } + return allowed +} + +type ecDiskState struct { + baseAvailable int64 + reservedVolumes int32 + reservedShardSlots int32 +} + +type ecPlacementPlanner struct { + activeTopology *topology.ActiveTopology + candidates []*placement.DiskCandidate + candidateByKey map[string]*placement.DiskCandidate + diskStates map[string]*ecDiskState +} + +func newECPlacementPlanner(activeTopology *topology.ActiveTopology) *ecPlacementPlanner { + if activeTopology == nil { + return nil + } + + disks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, "", 0) + candidates := diskInfosToCandidates(disks) + if len(candidates) == 0 { + return &ecPlacementPlanner{ + activeTopology: activeTopology, + candidates: candidates, + candidateByKey: map[string]*placement.DiskCandidate{}, + diskStates: map[string]*ecDiskState{}, + } + } + + candidateByKey := make(map[string]*placement.DiskCandidate, len(candidates)) + diskStates := make(map[string]*ecDiskState, len(candidates)) + for _, candidate := range candidates { + key := ecDiskKey(candidate.NodeID, candidate.DiskID) + candidateByKey[key] = candidate + diskStates[key] = &ecDiskState{ + baseAvailable: int64(candidate.FreeSlots), + } + } + + return &ecPlacementPlanner{ + activeTopology: activeTopology, + candidates: candidates, + candidateByKey: candidateByKey, + diskStates: diskStates, + } +} + +func (p *ecPlacementPlanner) selectDestinations(sourceRack, sourceDC string, shardsNeeded int) ([]*placement.DiskCandidate, error) { + if p == nil || p.activeTopology == nil { + return nil, fmt.Errorf("ec placement planner is not initialized") + } + if shardsNeeded <= 0 { + return nil, fmt.Errorf("invalid shardsNeeded %d", shardsNeeded) + } + + config := placement.PlacementRequest{ + ShardsNeeded: shardsNeeded, + MaxShardsPerServer: 0, + MaxShardsPerRack: 0, + MaxTaskLoad: topology.MaxTaskLoadForECPlacement, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := placement.SelectDestinations(p.candidates, config) + if err != nil { + return nil, err + } + return result.SelectedDisks, nil +} + +func (p *ecPlacementPlanner) applyTaskReservations(volumeSize int64, sources []topology.TaskSourceSpec, destinations []topology.TaskDestinationSpec) { + if p == nil { + return + } + + touched := make(map[string]bool) + + for _, source := range sources { + impact := p.sourceImpact(source, volumeSize) + p.applyImpact(source.ServerID, source.DiskID, impact) + p.bumpShardCount(source.ServerID, source.DiskID, impact.ShardSlots) + key := ecDiskKey(source.ServerID, source.DiskID) + if !touched[key] { + p.bumpLoad(source.ServerID, source.DiskID) + touched[key] = true + } + } + + for _, dest := range destinations { + impact := p.destinationImpact(dest, volumeSize) + p.applyImpact(dest.ServerID, dest.DiskID, impact) + p.bumpShardCount(dest.ServerID, dest.DiskID, impact.ShardSlots) + key := ecDiskKey(dest.ServerID, dest.DiskID) + if !touched[key] { + p.bumpLoad(dest.ServerID, dest.DiskID) + touched[key] = true + } + } +} + +func (p *ecPlacementPlanner) sourceImpact(source topology.TaskSourceSpec, volumeSize int64) topology.StorageSlotChange { + if source.StorageImpact != nil { + return *source.StorageImpact + } + if source.CleanupType == topology.CleanupECShards { + return topology.CalculateECShardCleanupImpact(volumeSize) + } + impact, _ := topology.CalculateTaskStorageImpact(topology.TaskTypeErasureCoding, volumeSize) + return impact +} + +func (p *ecPlacementPlanner) destinationImpact(dest topology.TaskDestinationSpec, volumeSize int64) topology.StorageSlotChange { + if dest.StorageImpact != nil { + return *dest.StorageImpact + } + _, impact := topology.CalculateTaskStorageImpact(topology.TaskTypeErasureCoding, volumeSize) + return impact +} + +func (p *ecPlacementPlanner) applyImpact(nodeID string, diskID uint32, impact topology.StorageSlotChange) { + if impact.IsZero() { + return + } + key := ecDiskKey(nodeID, diskID) + state, ok := p.diskStates[key] + if !ok { + return + } + + state.reservedVolumes += impact.VolumeSlots + state.reservedShardSlots += impact.ShardSlots + + available := state.baseAvailable - int64(state.reservedVolumes) - int64(state.reservedShardSlots)/int64(topology.ShardsPerVolumeSlot) + if available < 0 { + available = 0 + } + + if candidate, ok := p.candidateByKey[key]; ok { + candidate.FreeSlots = int(available) + candidate.VolumeCount = candidate.MaxVolumeCount - available + } +} + +func (p *ecPlacementPlanner) bumpLoad(nodeID string, diskID uint32) { + key := ecDiskKey(nodeID, diskID) + if candidate, ok := p.candidateByKey[key]; ok { + candidate.LoadCount++ + } +} + +func (p *ecPlacementPlanner) bumpShardCount(nodeID string, diskID uint32, delta int32) { + if delta == 0 { + return + } + key := ecDiskKey(nodeID, diskID) + if candidate, ok := p.candidateByKey[key]; ok { + candidate.ShardCount += int(delta) + if candidate.ShardCount < 0 { + candidate.ShardCount = 0 + } + } +} + +func ecDiskKey(nodeID string, diskID uint32) string { + return fmt.Sprintf("%s:%d", nodeID, diskID) } // planECDestinations plans the destinations for erasure coding operation // This function implements EC destination planning logic directly in the detection phase -func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.VolumeHealthMetrics, ecConfig *Config) (*topology.MultiDestinationPlan, error) { +func planECDestinations(planner *ecPlacementPlanner, metric *types.VolumeHealthMetrics, ecConfig *Config) (*topology.MultiDestinationPlan, error) { + if planner == nil || planner.activeTopology == nil { + return nil, fmt.Errorf("active topology not available for EC placement") + } // Calculate expected shard size for EC operation expectedShardSize := uint64(metric.Size) / uint64(erasure_coding.DataShardsCount) @@ -286,7 +517,7 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V var sourceRack, sourceDC string // Extract rack and DC from topology info - topologyInfo := activeTopology.GetTopologyInfo() + topologyInfo := planner.activeTopology.GetTopologyInfo() if topologyInfo != nil { for _, dc := range topologyInfo.DataCenterInfos { for _, rack := range dc.RackInfos { @@ -307,17 +538,11 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V } } - // Get available disks for EC placement with effective capacity consideration (includes pending tasks) - // For EC, we typically need 1 volume slot per shard, so use minimum capacity of 1 - // For EC, we need at least 1 available volume slot on a disk to consider it for placement. - // Note: We don't exclude the source server since the original volume will be deleted after EC conversion - availableDisks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, "", 1) - if len(availableDisks) < erasure_coding.MinTotalDisks { - return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d (considering pending/active tasks)", erasure_coding.MinTotalDisks, len(availableDisks)) + // Select best disks for EC placement with rack/DC diversity using the cached planner + selectedDisks, err := planner.selectDestinations(sourceRack, sourceDC, erasure_coding.TotalShardsCount) + if err != nil { + return nil, err } - - // Select best disks for EC placement with rack/DC diversity - selectedDisks := selectBestECDestinations(availableDisks, sourceRack, sourceDC, erasure_coding.TotalShardsCount) if len(selectedDisks) < erasure_coding.MinTotalDisks { return nil, fmt.Errorf("found %d disks, but could not find %d suitable destinations for EC placement", len(selectedDisks), erasure_coding.MinTotalDisks) } @@ -328,7 +553,7 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V for _, disk := range selectedDisks { // Get the target server address - targetAddress, err := util.ResolveServerAddress(disk.NodeID, activeTopology) + targetAddress, err := util.ResolveServerAddress(disk.NodeID, planner.activeTopology) if err != nil { return nil, fmt.Errorf("failed to resolve address for target server %s: %v", disk.NodeID, err) } @@ -340,7 +565,7 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V TargetRack: disk.Rack, TargetDC: disk.DataCenter, ExpectedSize: expectedShardSize, // Set calculated EC shard size - PlacementScore: calculateECScore(disk, sourceRack, sourceDC), + PlacementScore: calculateECScoreCandidate(disk, sourceRack, sourceDC), } plans = append(plans, plan) @@ -353,8 +578,10 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V // Log capacity utilization information using ActiveTopology's encapsulated logic totalEffectiveCapacity := int64(0) for _, plan := range plans { - effectiveCapacity := activeTopology.GetEffectiveAvailableCapacity(plan.TargetNode, plan.TargetDisk) - totalEffectiveCapacity += effectiveCapacity + key := ecDiskKey(plan.TargetNode, plan.TargetDisk) + if candidate, ok := planner.candidateByKey[key]; ok { + totalEffectiveCapacity += int64(candidate.FreeSlots) + } } glog.V(1).Infof("Planned EC destinations for volume %d (size=%d bytes): expected shard size=%d bytes, %d shards across %d racks, %d DCs, total effective capacity=%d slots", @@ -565,18 +792,18 @@ func candidatesToDiskInfos(candidates []*placement.DiskCandidate, originalDisks return result } -// calculateECScore calculates placement score for EC operations -// Used for logging and plan metadata -func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) float64 { - if disk.DiskInfo == nil { +// calculateECScoreCandidate calculates placement score for EC operations. +// Used for logging and plan metadata. +func calculateECScoreCandidate(disk *placement.DiskCandidate, sourceRack, sourceDC string) float64 { + if disk == nil { return 0.0 } score := 0.0 // Prefer disks with available capacity (primary factor) - if disk.DiskInfo.MaxVolumeCount > 0 { - utilization := float64(disk.DiskInfo.VolumeCount) / float64(disk.DiskInfo.MaxVolumeCount) + if disk.MaxVolumeCount > 0 { + utilization := float64(disk.VolumeCount) / float64(disk.MaxVolumeCount) score += (1.0 - utilization) * 60.0 // Up to 60 points for available capacity } diff --git a/weed/worker/tasks/erasure_coding/detection_test.go b/weed/worker/tasks/erasure_coding/detection_test.go new file mode 100644 index 000000000..06e4ce306 --- /dev/null +++ b/weed/worker/tasks/erasure_coding/detection_test.go @@ -0,0 +1,172 @@ +package erasure_coding + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/admin/topology" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/worker/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestECPlacementPlannerApplyReservations(t *testing.T) { + activeTopology := buildActiveTopology(t, 1, []string{"hdd"}, 10, 0) + + planner := newECPlacementPlanner(activeTopology) + require.NotNil(t, planner) + + key := ecDiskKey("10.0.0.1:8080", 0) + candidate, ok := planner.candidateByKey[key] + require.True(t, ok) + assert.Equal(t, 10, candidate.FreeSlots) + assert.Equal(t, 0, candidate.ShardCount) + assert.Equal(t, 0, candidate.LoadCount) + + shardImpact := topology.CalculateECShardStorageImpact(1, 1) + destinations := make([]topology.TaskDestinationSpec, 10) + for i := 0; i < 10; i++ { + destinations[i] = topology.TaskDestinationSpec{ + ServerID: "10.0.0.1:8080", + DiskID: 0, + StorageImpact: &shardImpact, + } + } + + planner.applyTaskReservations(1024, nil, destinations) + + candidate = planner.candidateByKey[key] + assert.Equal(t, 9, candidate.FreeSlots, "10 shard slots should reduce available volume slots by 1") + assert.Equal(t, 10, candidate.ShardCount) + assert.Equal(t, 1, candidate.LoadCount, "load should only be incremented once per disk") +} + +func TestPlanECDestinationsUsesPlanner(t *testing.T) { + activeTopology := buildActiveTopology(t, 7, []string{"hdd", "ssd"}, 100, 0) + planner := newECPlacementPlanner(activeTopology) + require.NotNil(t, planner) + + metric := &types.VolumeHealthMetrics{ + VolumeID: 1, + Server: "10.0.0.1:8080", + Size: 100 * 1024 * 1024, + Collection: "", + } + + plan, err := planECDestinations(planner, metric, NewDefaultConfig()) + require.NoError(t, err) + require.NotNil(t, plan) + assert.Equal(t, erasure_coding.TotalShardsCount, len(plan.Plans)) +} + +func TestDetectionContextCancellation(t *testing.T) { + activeTopology := buildActiveTopology(t, 5, []string{"hdd", "ssd"}, 50, 0) + clusterInfo := &types.ClusterInfo{ActiveTopology: activeTopology} + metrics := buildVolumeMetricsForIDs(50) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + _, _, err := Detection(ctx, metrics, clusterInfo, NewDefaultConfig(), 0) + require.ErrorIs(t, err, context.Canceled) +} + +func TestDetectionMaxResultsHonorsLimit(t *testing.T) { + activeTopology := buildActiveTopology(t, 4, []string{"hdd"}, 20, 0) + clusterInfo := &types.ClusterInfo{ActiveTopology: activeTopology} + metrics := buildVolumeMetricsForIDs(3) + + results, hasMore, err := Detection(context.Background(), metrics, clusterInfo, NewDefaultConfig(), 1) + require.NoError(t, err) + assert.Len(t, results, 1) + assert.True(t, hasMore) +} + +func TestPlanECDestinationsFailsWithInsufficientCapacity(t *testing.T) { + activeTopology := buildActiveTopology(t, 1, []string{"hdd"}, 1, 1) + planner := newECPlacementPlanner(activeTopology) + require.NotNil(t, planner) + + metric := &types.VolumeHealthMetrics{ + VolumeID: 2, + Server: "10.0.0.1:8080", + Size: 10 * 1024 * 1024, + Collection: "", + } + + _, err := planECDestinations(planner, metric, NewDefaultConfig()) + require.Error(t, err) +} + +func buildVolumeMetricsForIDs(count int) []*types.VolumeHealthMetrics { + metrics := make([]*types.VolumeHealthMetrics, 0, count) + now := time.Now() + for id := 1; id <= count; id++ { + metrics = append(metrics, &types.VolumeHealthMetrics{ + VolumeID: uint32(id), + Server: "10.0.0.1:8080", + Size: 200 * 1024 * 1024, + Collection: "", + FullnessRatio: 0.9, + LastModified: now.Add(-time.Hour), + Age: 10 * time.Minute, + }) + } + return metrics +} + +func buildActiveTopology(t *testing.T, nodeCount int, diskTypes []string, maxVolumeCount, usedVolumeCount int64) *topology.ActiveTopology { + t.Helper() + activeTopology := topology.NewActiveTopology(10) + + nodes := make([]*master_pb.DataNodeInfo, 0, nodeCount) + for i := 1; i <= nodeCount; i++ { + diskInfos := make(map[string]*master_pb.DiskInfo) + for diskIndex, diskType := range diskTypes { + used := usedVolumeCount + if used > maxVolumeCount { + used = maxVolumeCount + } + volumeInfos := make([]*master_pb.VolumeInformationMessage, 0, 200) + for vid := 1; vid <= 200; vid++ { + volumeInfos = append(volumeInfos, &master_pb.VolumeInformationMessage{ + Id: uint32(vid), + Collection: "", + DiskId: uint32(diskIndex), + }) + } + diskInfos[diskType] = &master_pb.DiskInfo{ + DiskId: uint32(diskIndex), + VolumeCount: used, + MaxVolumeCount: maxVolumeCount, + VolumeInfos: volumeInfos, + } + } + + nodes = append(nodes, &master_pb.DataNodeInfo{ + Id: fmt.Sprintf("10.0.0.%d:8080", i), + DiskInfos: diskInfos, + }) + } + + topologyInfo := &master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack1", + DataNodeInfos: nodes, + }, + }, + }, + }, + } + + require.NoError(t, activeTopology.UpdateTopology(topologyInfo)) + return activeTopology +} diff --git a/weed/worker/tasks/erasure_coding/register.go b/weed/worker/tasks/erasure_coding/register.go index 882b6cda6..ad7e48cc8 100644 --- a/weed/worker/tasks/erasure_coding/register.go +++ b/weed/worker/tasks/erasure_coding/register.go @@ -1,6 +1,7 @@ package erasure_coding import ( + "context" "fmt" "time" @@ -58,7 +59,10 @@ func RegisterErasureCodingTask() { dialOpt, ), nil }, - DetectionFunc: Detection, + DetectionFunc: func(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { + results, _, err := Detection(context.Background(), metrics, clusterInfo, config, 0) + return results, err + }, ScanInterval: 1 * time.Hour, SchedulingFunc: Scheduling, MaxConcurrent: 1,