diff --git a/weed/command/mini.go b/weed/command/mini.go index 2602a4a23..fc359f904 100644 --- a/weed/command/mini.go +++ b/weed/command/mini.go @@ -44,6 +44,7 @@ const ( minVolumeSizeMB = 64 // Minimum volume size in MB defaultMiniVolumeSizeMB = 128 // Default volume size for mini mode maxVolumeSizeMB = 1024 // Maximum volume size in MB (1GB) + GrpcPortOffset = 10000 // Offset used to calculate gRPC port from HTTP port ) var ( @@ -54,6 +55,8 @@ var ( miniWebDavOptions WebDavOption miniAdminOptions AdminOptions createdInitialIAM bool // Track if initial IAM config was created from env vars + // Track which port flags were explicitly passed on CLI before config file is applied + explicitPortFlags map[string]bool ) func init() { @@ -117,6 +120,15 @@ var ( miniS3AllowDeleteBucketNotEmpty = cmdMini.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket") ) +// getBindIp determines the bind IP address based on miniIp and miniBindIp flags +// Returns miniBindIp if set (non-empty), otherwise returns miniIp +func getBindIp() string { + if *miniBindIp != "" { + return *miniBindIp + } + return *miniIp +} + // initMiniCommonFlags initializes common mini flags func initMiniCommonFlags() { miniOptions.cpuprofile = cmdMini.Flag.String("cpuprofile", "", "cpu profile output file") @@ -242,7 +254,7 @@ func initMiniWebDAVFlags() { // initMiniAdminFlags initializes Admin server flag options func initMiniAdminFlags() { miniAdminOptions.port = cmdMini.Flag.Int("admin.port", 23646, "admin server http listen port") - miniAdminOptions.grpcPort = cmdMini.Flag.Int("admin.port.grpc", 0, "admin server grpc listen port (default: admin http port + 10000)") + miniAdminOptions.grpcPort = cmdMini.Flag.Int("admin.port.grpc", 0, "admin server grpc listen port (default: admin http port + GrpcPortOffset)") miniAdminOptions.master = cmdMini.Flag.String("admin.master", "", "master server address (automatically set)") miniAdminOptions.dataDir = cmdMini.Flag.String("admin.dataDir", "", "directory to store admin configuration and data files") miniAdminOptions.adminUser = cmdMini.Flag.String("admin.user", "admin", "admin interface username") @@ -326,6 +338,221 @@ func isFlagPassed(name string) bool { return found } +// isPortOpenOnIP checks if a port is available for binding on a specific IP address +func isPortOpenOnIP(ip string, port int) bool { + listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", ip, port)) + if err != nil { + return false + } + listener.Close() + return true +} + +// isPortAvailable checks if a port is available on any interface +// This is more comprehensive than checking a single IP +func isPortAvailable(port int) bool { + // Try to listen on all interfaces (0.0.0.0) + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + return false + } + listener.Close() + return true +} + +// findAvailablePortOnIP finds the next available port on a specific IP starting from the given port +// It skips any ports that are in the reservedPorts map (for gRPC port collision avoidance) +// It returns the first available port found within maxAttempts, or 0 if none found +func findAvailablePortOnIP(ip string, startPort int, maxAttempts int, reservedPorts map[int]bool) int { + for i := 0; i < maxAttempts; i++ { + port := startPort + i + // Skip ports reserved for gRPC calculation + if reservedPorts[port] { + continue + } + // Check on both the specific IP and on all interfaces for maximum reliability + if isPortOpenOnIP(ip, port) && isPortAvailable(port) { + return port + } + } + // If no port found, return 0 to indicate failure + return 0 +} + +// ensurePortAvailableOnIP ensures a port pointer points to an available port on a specific IP +// If the port is not available, it finds the next available port and updates the pointer +// The reservedPorts map contains ports that should not be allocated (for gRPC collision avoidance) +func ensurePortAvailableOnIP(portPtr *int, serviceName string, ip string, reservedPorts map[int]bool, flagName string) error { + if portPtr == nil { + return nil + } + + original := *portPtr + + // Check if this port was explicitly specified by the user (from CLI, before config file was applied) + isExplicitPort := explicitPortFlags[flagName] + + // Skip if this port is reserved for gRPC calculation + if reservedPorts[original] { + if isExplicitPort { + return fmt.Errorf("port %d for %s (specified by flag %s) is reserved for gRPC calculation and cannot be used", original, serviceName, flagName) + } + glog.Warningf("Port %d for %s is reserved for gRPC calculation, finding alternative...", original, serviceName) + newPort := findAvailablePortOnIP(ip, original+1, 100, reservedPorts) + if newPort == 0 { + glog.Errorf("Could not find available port for %s starting from %d, will use original %d and fail on binding", serviceName, original+1, original) + } else { + glog.Infof("Port %d for %s is available, using it instead of %d", newPort, serviceName, original) + *portPtr = newPort + } + return nil + } + + // Check on both the specific IP and on all interfaces (0.0.0.0) for maximum reliability + if !isPortOpenOnIP(ip, original) || !isPortAvailable(original) { + // If explicitly specified, fail immediately with the originally requested port + if isExplicitPort { + return fmt.Errorf("port %d for %s (specified by flag %s) is not available on %s and cannot be used", original, serviceName, flagName, ip) + } + // For default ports, try to find an alternative + glog.Warningf("Port %d for %s is not available on %s, finding alternative port...", original, serviceName, ip) + newPort := findAvailablePortOnIP(ip, original+1, 100, reservedPorts) + if newPort == 0 { + glog.Errorf("Could not find available port for %s starting from %d, will use original %d and fail on binding", serviceName, original+1, original) + } else { + glog.Infof("Port %d for %s is available, using it instead of %d", newPort, serviceName, original) + *portPtr = newPort + } + } else { + glog.V(1).Infof("Port %d for %s is available on %s", original, serviceName, ip) + } + return nil +} + +// ensureAllPortsAvailableOnIP ensures all mini service ports are available on a specific IP +// Returns an error if an explicitly specified port is unavailable. +// This should be called before starting any services +func ensureAllPortsAvailableOnIP(bindIp string) error { + portConfigs := []struct { + port *int + name string + flagName string + grpcPtr *int + }{ + {miniMasterOptions.port, "Master", "master.port", miniMasterOptions.portGrpc}, + {miniFilerOptions.port, "Filer", "filer.port", miniFilerOptions.portGrpc}, + {miniOptions.v.port, "Volume", "volume.port", miniOptions.v.portGrpc}, + {miniS3Options.port, "S3", "s3.port", miniS3Options.portGrpc}, + {miniWebDavOptions.port, "WebDAV", "webdav.port", nil}, + {miniAdminOptions.port, "Admin", "admin.port", miniAdminOptions.grpcPort}, + } + + // First, reserve all gRPC ports that will be calculated to prevent HTTP port allocation from using them + // This prevents collisions like: HTTP port moves to X, then gRPC port is calculated as Y where Y == X + reservedPorts := make(map[int]bool) + for _, config := range portConfigs { + if config.grpcPtr != nil && *config.grpcPtr == 0 { + // This gRPC port will be calculated as httpPort + GrpcPortOffset + calculatedGrpcPort := *config.port + GrpcPortOffset + reservedPorts[calculatedGrpcPort] = true + } + } + + // Check all HTTP ports sequentially to avoid race conditions + // Each port check and allocation must complete before the next one starts + // to prevent multiple goroutines from claiming the same available port + // Also avoid allocating ports that are reserved for gRPC calculation + for _, config := range portConfigs { + original := *config.port + if err := ensurePortAvailableOnIP(config.port, config.name, bindIp, reservedPorts, config.flagName); err != nil { + return err + } + // If port was changed, update the reserved gRPC ports mapping + if *config.port != original && config.grpcPtr != nil && *config.grpcPtr == 0 { + delete(reservedPorts, original+GrpcPortOffset) + reservedPorts[*config.port+GrpcPortOffset] = true + } + } + + // Initialize all gRPC ports before services start + // This ensures they won't be recalculated and cause conflicts + // All gRPC port handling (calculation, validation, and assignment) is performed exclusively in initializeGrpcPortsOnIP + initializeGrpcPortsOnIP(bindIp) + + // Log the final port configuration + glog.Infof("Final port configuration - Master: %d, Filer: %d, Volume: %d, S3: %d, WebDAV: %d, Admin: %d", + *miniMasterOptions.port, *miniFilerOptions.port, *miniOptions.v.port, + *miniS3Options.port, *miniWebDavOptions.port, *miniAdminOptions.port) + + // Log gRPC ports too (now finalized) + glog.Infof("gRPC port configuration - Master: %d, Filer: %d, Volume: %d, S3: %d, Admin: %d", + *miniMasterOptions.portGrpc, *miniFilerOptions.portGrpc, *miniOptions.v.portGrpc, + *miniS3Options.portGrpc, *miniAdminOptions.grpcPort) + + return nil +} + +// initializeGrpcPortsOnIP initializes all gRPC ports based on their HTTP ports on a specific IP +// If a gRPC port is 0, it will be set to httpPort + GrpcPortOffset +// This must be called after HTTP ports are finalized and before services start +func initializeGrpcPortsOnIP(bindIp string) { + // Track gRPC ports allocated during this function to prevent collisions between services + // when multiple services need fallback port allocation + allocatedGrpcPorts := make(map[int]bool) + + grpcConfigs := []struct { + httpPort *int + grpcPort *int + name string + }{ + {miniMasterOptions.port, miniMasterOptions.portGrpc, "Master"}, + {miniFilerOptions.port, miniFilerOptions.portGrpc, "Filer"}, + {miniOptions.v.port, miniOptions.v.portGrpc, "Volume"}, + {miniS3Options.port, miniS3Options.portGrpc, "S3"}, + {miniAdminOptions.port, miniAdminOptions.grpcPort, "Admin"}, + } + + for _, config := range grpcConfigs { + if config.grpcPort == nil { + continue + } + + // If gRPC port is 0, calculate it + if *config.grpcPort == 0 { + calculatedPort := *config.httpPort + GrpcPortOffset + // Check if calculated port is available (on both specific IP and all interfaces) + // Also check if it was already allocated to another service in this function + if !isPortOpenOnIP(bindIp, calculatedPort) || !isPortAvailable(calculatedPort) || allocatedGrpcPorts[calculatedPort] { + glog.Warningf("Calculated gRPC port %d for %s is not available, finding alternative...", calculatedPort, config.name) + newPort := findAvailablePortOnIP(bindIp, calculatedPort+1, 100, allocatedGrpcPorts) + if newPort == 0 { + glog.Errorf("Could not find available gRPC port for %s starting from %d, will use calculated %d and fail on binding", config.name, calculatedPort+1, calculatedPort) + } else { + calculatedPort = newPort + glog.Infof("gRPC port %d for %s is available, using it instead of calculated %d", newPort, config.name, *config.httpPort+GrpcPortOffset) + } + } + *config.grpcPort = calculatedPort + allocatedGrpcPorts[calculatedPort] = true + glog.V(1).Infof("%s gRPC port initialized to %d", config.name, calculatedPort) + } else { + // gRPC port was explicitly set, verify it's still available (check on both specific IP and all interfaces) + // Also check if it was already allocated to another service in this function + if !isPortOpenOnIP(bindIp, *config.grpcPort) || !isPortAvailable(*config.grpcPort) || allocatedGrpcPorts[*config.grpcPort] { + glog.Warningf("Explicitly set gRPC port %d for %s is not available, finding alternative...", *config.grpcPort, config.name) + newPort := findAvailablePortOnIP(bindIp, *config.grpcPort+1, 100, allocatedGrpcPorts) + if newPort == 0 { + glog.Errorf("Could not find available gRPC port for %s starting from %d, will use original %d and fail on binding", config.name, *config.grpcPort+1, *config.grpcPort) + } else { + glog.Infof("gRPC port %d for %s is available, using it instead of %d", newPort, config.name, *config.grpcPort) + *config.grpcPort = newPort + } + } + allocatedGrpcPorts[*config.grpcPort] = true + } + } +} + // loadMiniConfigurationFile reads the mini.options file and returns parsed options // File format: one option per line, without leading dash (e.g., "ip=127.0.0.1") func loadMiniConfigurationFile(dataFolder string) (map[string]string, error) { @@ -380,6 +607,11 @@ func loadMiniConfigurationFile(dataFolder string) (map[string]string, error) { // applyConfigFileOptions sets command-line flags from loaded configuration file func applyConfigFileOptions(options map[string]string) { for key, value := range options { + // Skip port flags that were explicitly passed on CLI + if explicitPortFlags[key] { + glog.V(2).Infof("Skipping config file option %s=%s (explicitly specified on command line)", key, value) + continue + } // Set the flag value if it hasn't been explicitly set on command line flag := cmdMini.Flag.Lookup(key) if flag != nil { @@ -442,6 +674,14 @@ func saveMiniConfiguration(dataFolder string) error { func runMini(cmd *Command, args []string) bool { + // Capture which port flags were explicitly passed on CLI BEFORE config file is applied + // This is necessary to distinguish user-specified ports from defaults or config file options + explicitPortFlags = make(map[string]bool) + portFlagNames := []string{"master.port", "filer.port", "volume.port", "s3.port", "webdav.port", "admin.port"} + for _, flagName := range portFlagNames { + explicitPortFlags[flagName] = isFlagPassed(flagName) + } + // Load configuration from file if it exists configOptions, err := loadMiniConfigurationFile(*miniDataFolders) if err != nil { @@ -459,6 +699,15 @@ func runMini(cmd *Command, args []string) bool { grace.SetupProfiling(*miniOptions.cpuprofile, *miniOptions.memprofile) + // Determine bind IP + bindIp := getBindIp() + + // Ensure all ports are available, find alternatives if needed + if err := ensureAllPortsAvailableOnIP(bindIp); err != nil { + glog.Errorf("Port allocation failed: %v", err) + os.Exit(1) + } + // Set master.peers to "none" if not specified (single master mode) if *miniMasterOptions.peers == "" { *miniMasterOptions.peers = "none" @@ -552,13 +801,16 @@ func runMini(cmd *Command, args []string) bool { // startMiniServices starts all mini services with proper dependency coordination func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) { + // Determine bind IP for health checks + bindIp := getBindIp() + // Start Master server (no dependencies) go startMiniService("Master", func() { startMaster(miniMasterOptions, miniWhiteList) }, *miniMasterOptions.port) // Wait for master to be ready - waitForServiceReady("Master", *miniMasterOptions.port) + waitForServiceReady("Master", *miniMasterOptions.port, bindIp) // Start Volume server (depends on master) go startMiniService("Volume", func() { @@ -567,7 +819,7 @@ func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) { }, *miniOptions.v.port) // Wait for volume to be ready - waitForServiceReady("Volume", *miniOptions.v.port) + waitForServiceReady("Volume", *miniOptions.v.port, bindIp) // Start Filer (depends on master and volume) go startMiniService("Filer", func() { @@ -575,7 +827,7 @@ func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) { }, *miniFilerOptions.port) // Wait for filer to be ready - waitForServiceReady("Filer", *miniFilerOptions.port) + waitForServiceReady("Filer", *miniFilerOptions.port, bindIp) // Start S3 and WebDAV in parallel (both depend on filer) go startMiniService("S3", func() { @@ -587,8 +839,8 @@ func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) { }, *miniWebDavOptions.port) // Wait for both S3 and WebDAV to be ready - waitForServiceReady("S3", *miniS3Options.port) - waitForServiceReady("WebDAV", *miniWebDavOptions.port) + waitForServiceReady("S3", *miniS3Options.port, bindIp) + waitForServiceReady("WebDAV", *miniWebDavOptions.port, bindIp) // Start Admin with worker (depends on master, filer, S3, WebDAV) go startMiniAdminWithWorker(allServicesReady) @@ -601,8 +853,8 @@ func startMiniService(name string, fn func(), port int) { } // waitForServiceReady pings the service HTTP endpoint to check if it's ready to accept connections -func waitForServiceReady(name string, port int) { - address := fmt.Sprintf("http://127.0.0.1:%d", port) +func waitForServiceReady(name string, port int, bindIp string) { + address := fmt.Sprintf("http://%s:%d", bindIp, port) maxAttempts := 30 // 30 * 200ms = 6 seconds max wait attempt := 0 client := &http.Client{ @@ -679,8 +931,29 @@ func startMiniAdminWithWorker(allServicesReady chan struct{}) { // Set admin options *miniAdminOptions.master = masterAddr + + // gRPC port should have been initialized by ensureAllPortsAvailableOnIP in runMini + // If it's still 0, that indicates a problem with the port initialization sequence + // This defensive fallback handles edge cases where port initialization may have been skipped + // or failed silently (e.g., due to configuration changes or error handling paths) if *miniAdminOptions.grpcPort == 0 { - *miniAdminOptions.grpcPort = *miniAdminOptions.port + 10000 + glog.Warningf("Admin gRPC port was not initialized before startAdminServer, attempting fallback initialization...") + // Use the same availability checking logic as initializeGrpcPortsOnIP + calculatedPort := *miniAdminOptions.port + GrpcPortOffset + if !isPortOpenOnIP(getBindIp(), calculatedPort) || !isPortAvailable(calculatedPort) { + glog.Warningf("Calculated fallback gRPC port %d is not available, finding alternative...", calculatedPort) + newPort := findAvailablePortOnIP(getBindIp(), calculatedPort+1, 100, make(map[int]bool)) + if newPort == 0 { + glog.Errorf("Could not find available gRPC port for Admin starting from %d, will use calculated %d and fail on binding", calculatedPort+1, calculatedPort) + *miniAdminOptions.grpcPort = calculatedPort + } else { + glog.Infof("Fallback: using gRPC port %d for Admin", newPort) + *miniAdminOptions.grpcPort = newPort + } + } else { + *miniAdminOptions.grpcPort = calculatedPort + glog.Infof("Fallback: Admin gRPC port initialized to %d", calculatedPort) + } } // Create data directory if specified diff --git a/weed/worker/client.go b/weed/worker/client.go index a080e58cf..d562b8703 100644 --- a/weed/worker/client.go +++ b/weed/worker/client.go @@ -522,7 +522,8 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData) error { } // Wait for registration response - timeout := time.NewTimer(10 * time.Second) + // Use longer timeout for reconnections since admin server might be busy + timeout := time.NewTimer(30 * time.Second) defer timeout.Stop() for {