From e9e7b29d2f77ee5cb82147d50621255410695ee3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 19 Feb 2026 20:31:09 -0800 Subject: [PATCH] fix flags --- weed/command/mini.go | 99 ++++++++++++++++++++++++++++++++++++------ weed/util/fla9/fla9.go | 1 + 2 files changed, 86 insertions(+), 14 deletions(-) diff --git a/weed/command/mini.go b/weed/command/mini.go index bb7b6ebc3..0c4e00666 100644 --- a/weed/command/mini.go +++ b/weed/command/mini.go @@ -461,6 +461,7 @@ func ensurePortAvailableOnIP(portPtr *int, serviceName string, ip string, reserv // Returns an error if an explicitly specified port is unavailable. // This should be called before starting any services func ensureAllPortsAvailableOnIP(bindIp string) error { + glog.V(1).Infof("ensureAllPortsAvailableOnIP started for %s", bindIp) portConfigs := []struct { port *int name string @@ -530,7 +531,9 @@ func ensureAllPortsAvailableOnIP(bindIp string) error { // Initialize all gRPC ports before services start // This ensures they won't be recalculated and cause conflicts // All gRPC port handling (calculation, validation, and assignment) is performed exclusively in initializeGrpcPortsOnIP - initializeGrpcPortsOnIP(bindIp) + if err := initializeGrpcPortsOnIP(bindIp); err != nil { + return err + } // Log the final port configuration icebergPortStr := "disabled" @@ -552,7 +555,7 @@ func ensureAllPortsAvailableOnIP(bindIp string) error { // initializeGrpcPortsOnIP initializes all gRPC ports based on their HTTP ports on a specific IP // If a gRPC port is 0, it will be set to httpPort + GrpcPortOffset // This must be called after HTTP ports are finalized and before services start -func initializeGrpcPortsOnIP(bindIp string) { +func initializeGrpcPortsOnIP(bindIp string) error { // Track gRPC ports allocated during this function to prevent collisions between services // when multiple services need fallback port allocation allocatedGrpcPorts := make(map[int]bool) @@ -581,9 +584,28 @@ func initializeGrpcPortsOnIP(bindIp string) { for _, config := range grpcConfigs { if config.grpcPort == nil { + glog.V(1).Infof("Skipping gRPC port initialization for %s (grpcPort is nil)", config.name) continue } + glog.V(1).Infof("Initializing gRPC port for %s: current value %d, http port %d", config.name, *config.grpcPort, *config.httpPort) + + // Check if gRPC port was explicitly passed + grpcFlagName := "" + switch config.name { + case "Master": + grpcFlagName = "master.port.grpc" + case "Filer": + grpcFlagName = "filer.port.grpc" + case "Volume": + grpcFlagName = "volume.port.grpc" + case "S3": + grpcFlagName = "s3.port.grpc" + case "Admin": + grpcFlagName = "admin.port.grpc" + } + isExplicitlyPassed := grpcFlagName != "" && isFlagPassed(grpcFlagName) + // If gRPC port is 0, calculate it from HTTP port if *config.grpcPort == 0 { *config.grpcPort = *config.httpPort + GrpcPortOffset @@ -592,6 +614,9 @@ func initializeGrpcPortsOnIP(bindIp string) { // Verify the gRPC port is available (whether calculated or explicitly set) // Check on both specific IP and all interfaces, and check against already allocated ports if !isPortOpenOnIP(bindIp, *config.grpcPort) || !isPortAvailable(*config.grpcPort) || allocatedGrpcPorts[*config.grpcPort] { + if isExplicitlyPassed { + return fmt.Errorf("explicitly specified gRPC port %d for %s is not available", *config.grpcPort, config.name) + } glog.Warningf("gRPC port %d for %s is not available, finding alternative...", *config.grpcPort, config.name) originalPort := *config.grpcPort newPort := findAvailablePortOnIP(bindIp, originalPort+1, 100, allocatedGrpcPorts) @@ -603,8 +628,9 @@ func initializeGrpcPortsOnIP(bindIp string) { } } allocatedGrpcPorts[*config.grpcPort] = true - glog.V(1).Infof("%s gRPC port set to %d", config.name, *config.grpcPort) + glog.Infof("%s gRPC port finalized to %d", config.name, *config.grpcPort) } + return nil } // loadMiniConfigurationFile reads the mini.options file and returns parsed options @@ -728,10 +754,22 @@ func saveMiniConfiguration(dataFolder string) error { func runMini(cmd *Command, args []string) bool { + glog.V(1).Infof("--- runMini started (IP: %s) ---", *miniIp) + glog.V(1).Infof("Initial ports - Master: %d, Filer: %d, Volume: %d, S3: %d, Admin: %d", + *miniMasterOptions.port, *miniFilerOptions.port, *miniOptions.v.port, + *miniS3Options.port, *miniAdminOptions.port) + glog.V(1).Infof("Initial gRPC ports - Master: %d, Filer: %d, Volume: %d, S3: %d, Admin: %d", + *miniMasterOptions.portGrpc, *miniFilerOptions.portGrpc, *miniOptions.v.portGrpc, + *miniS3Options.portGrpc, *miniAdminOptions.grpcPort) + // Capture which port flags were explicitly passed on CLI BEFORE config file is applied // This is necessary to distinguish user-specified ports from defaults or config file options explicitPortFlags = make(map[string]bool) - portFlagNames := []string{"master.port", "filer.port", "volume.port", "s3.port", "s3.port.iceberg", "webdav.port", "admin.port", "s3.iam.readOnly"} + portFlagNames := []string{ + "master.port", "filer.port", "volume.port", "s3.port", "s3.port.iceberg", "webdav.port", "admin.port", + "master.port.grpc", "filer.port.grpc", "volume.port.grpc", "s3.port.grpc", "admin.port.grpc", + "s3.iam.readOnly", + } for _, flagName := range portFlagNames { explicitPortFlags[flagName] = isFlagPassed(flagName) } @@ -759,7 +797,7 @@ func runMini(cmd *Command, args []string) bool { // Ensure all ports are available, find alternatives if needed if err := ensureAllPortsAvailableOnIP(bindIp); err != nil { glog.Errorf("Port allocation failed: %v", err) - os.Exit(1) + return false } // Set master.peers to "none" if not specified (single master mode) @@ -860,6 +898,11 @@ func runMini(cmd *Command, args []string) bool { // startMiniServices starts all mini services with proper dependency coordination func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) { + ctx := MiniClusterCtx + if ctx == nil { + ctx = context.Background() + } + // Determine bind IP for health checks bindIp := getBindIp() @@ -869,7 +912,9 @@ func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) { }, *miniMasterOptions.port) // Wait for master to be ready - waitForServiceReady("Master", *miniMasterOptions.port, bindIp) + if !waitForServiceReady(ctx, "Master", *miniMasterOptions.port, bindIp) { + return + } // Start Volume server (depends on master) go startMiniService("Volume", func() { @@ -878,7 +923,9 @@ func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) { }, *miniOptions.v.port) // Wait for volume to be ready - waitForServiceReady("Volume", *miniOptions.v.port, bindIp) + if !waitForServiceReady(ctx, "Volume", *miniOptions.v.port, bindIp) { + return + } // Start Filer (depends on master and volume) go startMiniService("Filer", func() { @@ -886,7 +933,9 @@ func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) { }, *miniFilerOptions.port) // Wait for filer to be ready - waitForServiceReady("Filer", *miniFilerOptions.port, bindIp) + if !waitForServiceReady(ctx, "Filer", *miniFilerOptions.port, bindIp) { + return + } // Start S3 and WebDAV in parallel (both depend on filer) if *miniEnableS3 { @@ -903,13 +952,19 @@ func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) { // Wait for services to be ready if *miniEnableS3 { - waitForServiceReady("S3", *miniS3Options.port, bindIp) + if !waitForServiceReady(ctx, "S3", *miniS3Options.port, bindIp) { + return + } if miniS3Options.portIceberg != nil && *miniS3Options.portIceberg > 0 { - waitForServiceReady("Iceberg", *miniS3Options.portIceberg, bindIp) + if !waitForServiceReady(ctx, "Iceberg", *miniS3Options.portIceberg, bindIp) { + return + } } } if *miniEnableWebDAV { - waitForServiceReady("WebDAV", *miniWebDavOptions.port, bindIp) + if !waitForServiceReady(ctx, "WebDAV", *miniWebDavOptions.port, bindIp) { + return + } } // Start Admin with worker (depends on master, filer, S3, WebDAV) @@ -923,7 +978,7 @@ func startMiniService(name string, fn func(), port int) { } // waitForServiceReady pings the service HTTP endpoint to check if it's ready to accept connections -func waitForServiceReady(name string, port int, bindIp string) { +func waitForServiceReady(ctx context.Context, name string, port int, bindIp string) bool { address := fmt.Sprintf("http://%s:%d", bindIp, port) healthAddr := getHealthCheckAddr(address) maxAttempts := 30 // 30 * 200ms = 6 seconds max wait @@ -933,19 +988,30 @@ func waitForServiceReady(name string, port int, bindIp string) { } for attempt < maxAttempts { + select { + case <-ctx.Done(): + return false + default: + } resp, err := client.Get(healthAddr) if err == nil { resp.Body.Close() glog.Infof("%s service is ready at %s", name, address) - return + return true } attempt++ - time.Sleep(200 * time.Millisecond) + + select { + case <-ctx.Done(): + return false + case <-time.After(200 * time.Millisecond): + } } // Service failed to become ready, log warning but don't fail startup // (services may still work even if health check endpoint isn't responding immediately) glog.Warningf("Health check for %s failed (service may still be functional, retries may succeed)", name) + return true } // startS3Service initializes and starts the S3 server @@ -1002,6 +1068,11 @@ func startMiniAdminWithWorker(allServicesReady chan struct{}) { // gRPC port should have been initialized by ensureAllPortsAvailableOnIP in runMini // If it's still 0, that indicates a problem with the port initialization sequence if *miniAdminOptions.grpcPort == 0 { + // If context is already cancelled, don't fatal - we're shutting down anyway + if ctx.Err() != nil { + glog.Infof("Admin gRPC port was 0, but context is cancelled. Skipping startup.") + return + } glog.Fatalf("Admin gRPC port was not initialized before startAdminServer. This indicates a problem with the port initialization sequence.") } diff --git a/weed/util/fla9/fla9.go b/weed/util/fla9/fla9.go index 390e35333..6aa9de95c 100644 --- a/weed/util/fla9/fla9.go +++ b/weed/util/fla9/fla9.go @@ -905,6 +905,7 @@ func (f *FlagSet) parseOne() (bool, error) { // are defined and before flags are accessed by the program. // The return value will be ErrHelp if -help or -h were set but not defined. func (f *FlagSet) Parse(arguments []string) error { + f.actual = nil if _, ok := f.formal[DefaultConfigFlagName]; !ok { f.String(DefaultConfigFlagName, "", "a file of command line options, each line in optionName=optionValue format") }