Browse Source

feat: add automatic port detection and fallback for mini command

- Added port availability detection using TCP binding tests
- Implemented port fallback mechanism searching for available ports
- Support for both HTTP and gRPC port handling
- IP-aware port checking using actual service bind address
- Dual-interface verification (specific IP and wildcard 0.0.0.0)
- All services (Master, Volume, Filer, S3, WebDAV, Admin) auto-reallocate to available ports
- Enables multiple mini instances to run simultaneously without conflicts
pull/7838/head
Chris Lu 2 months ago
parent
commit
fa84efbf7f
  1. 286
      weed/command/mini.go

286
weed/command/mini.go

@ -9,6 +9,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
@ -44,7 +45,6 @@ const (
minVolumeSizeMB = 64 // Minimum volume size in MB
defaultMiniVolumeSizeMB = 128 // Default volume size for mini mode
maxVolumeSizeMB = 1024 // Maximum volume size in MB (1GB)
GrpcPortOffset = 10000 // Offset used to calculate gRPC port from HTTP port
)
var (
@ -55,8 +55,6 @@ var (
miniWebDavOptions WebDavOption
miniAdminOptions AdminOptions
createdInitialIAM bool // Track if initial IAM config was created from env vars
// Track which port flags were explicitly passed on CLI before config file is applied
explicitPortFlags map[string]bool
)
func init() {
@ -120,15 +118,6 @@ var (
miniS3AllowDeleteBucketNotEmpty = cmdMini.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket")
)
// getBindIp determines the bind IP address based on miniIp and miniBindIp flags
// Returns miniBindIp if set (non-empty), otherwise returns miniIp
func getBindIp() string {
if *miniBindIp != "" {
return *miniBindIp
}
return *miniIp
}
// initMiniCommonFlags initializes common mini flags
func initMiniCommonFlags() {
miniOptions.cpuprofile = cmdMini.Flag.String("cpuprofile", "", "cpu profile output file")
@ -254,7 +243,7 @@ func initMiniWebDAVFlags() {
// initMiniAdminFlags initializes Admin server flag options
func initMiniAdminFlags() {
miniAdminOptions.port = cmdMini.Flag.Int("admin.port", 23646, "admin server http listen port")
miniAdminOptions.grpcPort = cmdMini.Flag.Int("admin.port.grpc", 0, "admin server grpc listen port (default: admin http port + GrpcPortOffset)")
miniAdminOptions.grpcPort = cmdMini.Flag.Int("admin.port.grpc", 0, "admin server grpc listen port (default: admin http port + 10000)")
miniAdminOptions.master = cmdMini.Flag.String("admin.master", "", "master server address (automatically set)")
miniAdminOptions.dataDir = cmdMini.Flag.String("admin.dataDir", "", "directory to store admin configuration and data files")
miniAdminOptions.adminUser = cmdMini.Flag.String("admin.user", "admin", "admin interface username")
@ -338,6 +327,11 @@ func isFlagPassed(name string) bool {
return found
}
// isPortOpen checks if a port is available for binding on a specific IP address
func isPortOpen(port int) bool {
return isPortOpenOnIP("127.0.0.1", port)
}
// isPortOpenOnIP checks if a port is available for binding on a specific IP address
func isPortOpenOnIP(ip string, port int) bool {
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", ip, port))
@ -360,146 +354,146 @@ func isPortAvailable(port int) bool {
return true
}
// findAvailablePort finds the next available port starting from the given port
// It returns the first available port found within maxAttempts
func findAvailablePort(startPort int, maxAttempts int) int {
return findAvailablePortOnIP("127.0.0.1", startPort, maxAttempts)
}
// findAvailablePortOnIP finds the next available port on a specific IP starting from the given port
// It skips any ports that are in the reservedPorts map (for gRPC port collision avoidance)
// It returns the first available port found within maxAttempts, or 0 if none found
func findAvailablePortOnIP(ip string, startPort int, maxAttempts int, reservedPorts map[int]bool) int {
// It returns the first available port found within maxAttempts
func findAvailablePortOnIP(ip string, startPort int, maxAttempts int) int {
for i := 0; i < maxAttempts; i++ {
port := startPort + i
// Skip ports reserved for gRPC calculation
if reservedPorts[port] {
continue
}
// Check on both the specific IP and on all interfaces for maximum reliability
if isPortOpenOnIP(ip, port) && isPortAvailable(port) {
return port
}
}
// If no port found, return 0 to indicate failure
return 0
// If no port found, return the original (will fail during binding)
return startPort
}
// ensurePortAvailable ensures a port pointer points to an available port
// If the port is not available, it finds the next available port and updates the pointer
func ensurePortAvailable(portPtr *int, serviceName string) {
ensurePortAvailableOnIP(portPtr, serviceName, "127.0.0.1")
}
// ensurePortAvailableOnIP ensures a port pointer points to an available port on a specific IP
// If the port is not available, it finds the next available port and updates the pointer
// The reservedPorts map contains ports that should not be allocated (for gRPC collision avoidance)
func ensurePortAvailableOnIP(portPtr *int, serviceName string, ip string, reservedPorts map[int]bool, flagName string) error {
func ensurePortAvailableOnIP(portPtr *int, serviceName string, ip string) {
if portPtr == nil {
return nil
return
}
original := *portPtr
// Check if this port was explicitly specified by the user (from CLI, before config file was applied)
isExplicitPort := explicitPortFlags[flagName]
// Skip if this port is reserved for gRPC calculation
if reservedPorts[original] {
if isExplicitPort {
return fmt.Errorf("port %d for %s (specified by flag %s) is reserved for gRPC calculation and cannot be used", original, serviceName, flagName)
}
glog.Warningf("Port %d for %s is reserved for gRPC calculation, finding alternative...", original, serviceName)
newPort := findAvailablePortOnIP(ip, original+1, 100, reservedPorts)
if newPort == 0 {
glog.Errorf("Could not find available port for %s starting from %d, will use original %d and fail on binding", serviceName, original+1, original)
} else {
glog.Infof("Port %d for %s is available, using it instead of %d", newPort, serviceName, original)
*portPtr = newPort
}
return nil
}
// Check on both the specific IP and on all interfaces (0.0.0.0) for maximum reliability
if !isPortOpenOnIP(ip, original) || !isPortAvailable(original) {
// If explicitly specified, fail immediately with the originally requested port
if isExplicitPort {
return fmt.Errorf("port %d for %s (specified by flag %s) is not available on %s and cannot be used", original, serviceName, flagName, ip)
}
// For default ports, try to find an alternative
glog.Warningf("Port %d for %s is not available on %s, finding alternative port...", original, serviceName, ip)
newPort := findAvailablePortOnIP(ip, original+1, 100, reservedPorts)
if newPort == 0 {
glog.Errorf("Could not find available port for %s starting from %d, will use original %d and fail on binding", serviceName, original+1, original)
} else {
newPort := findAvailablePortOnIP(ip, original+1, 100)
if newPort != original {
glog.Infof("Port %d for %s is available, using it instead of %d", newPort, serviceName, original)
*portPtr = newPort
}
} else {
glog.V(1).Infof("Port %d for %s is available on %s", original, serviceName, ip)
}
return nil
}
// ensureAllPortsAvailable ensures all mini service ports are available
// This should be called before starting any services
func ensureAllPortsAvailable() {
ensureAllPortsAvailableOnIP("127.0.0.1")
}
// ensureAllPortsAvailableOnIP ensures all mini service ports are available on a specific IP
// Returns an error if an explicitly specified port is unavailable.
// This should be called before starting any services
func ensureAllPortsAvailableOnIP(bindIp string) error {
func ensureAllPortsAvailableOnIP(bindIp string) {
portConfigs := []struct {
port *int
name string
flagName string
grpcPtr *int
port *int
name string
grpcPtr *int
}{
{miniMasterOptions.port, "Master", "master.port", miniMasterOptions.portGrpc},
{miniFilerOptions.port, "Filer", "filer.port", miniFilerOptions.portGrpc},
{miniOptions.v.port, "Volume", "volume.port", miniOptions.v.portGrpc},
{miniS3Options.port, "S3", "s3.port", miniS3Options.portGrpc},
{miniWebDavOptions.port, "WebDAV", "webdav.port", nil},
{miniAdminOptions.port, "Admin", "admin.port", miniAdminOptions.grpcPort},
{miniMasterOptions.port, "Master", miniMasterOptions.portGrpc},
{miniFilerOptions.port, "Filer", miniFilerOptions.portGrpc},
{miniOptions.v.port, "Volume", miniOptions.v.portGrpc},
{miniS3Options.port, "S3", miniS3Options.portGrpc},
{miniWebDavOptions.port, "WebDAV", nil},
{miniAdminOptions.port, "Admin", miniAdminOptions.grpcPort},
}
// First, reserve all gRPC ports that will be calculated to prevent HTTP port allocation from using them
// This prevents collisions like: HTTP port moves to X, then gRPC port is calculated as Y where Y == X
reservedPorts := make(map[int]bool)
// Check all HTTP ports in parallel to be efficient
var wg sync.WaitGroup
for _, config := range portConfigs {
if config.grpcPtr != nil && *config.grpcPtr == 0 {
// This gRPC port will be calculated as httpPort + GrpcPortOffset
calculatedGrpcPort := *config.port + GrpcPortOffset
reservedPorts[calculatedGrpcPort] = true
}
wg.Add(1)
go func(portPtr *int, name string, grpcPtr *int) {
defer wg.Done()
// Check main port on the specific IP
ensurePortAvailableOnIP(portPtr, name, bindIp)
}(config.port, config.name, config.grpcPtr)
}
// Check all HTTP ports sequentially to avoid race conditions
// Each port check and allocation must complete before the next one starts
// to prevent multiple goroutines from claiming the same available port
// Also avoid allocating ports that are reserved for gRPC calculation
wg.Wait()
// Now handle gRPC ports sequentially after HTTP ports are finalized
// gRPC ports can be:
// 1. Explicitly set by user (any independent value)
// 2. Auto-calculated as httpPort + 10000 if set to 0
// We need to check availability for both cases
for _, config := range portConfigs {
original := *config.port
if err := ensurePortAvailableOnIP(config.port, config.name, bindIp, reservedPorts, config.flagName); err != nil {
return err
if config.grpcPtr == nil {
continue
}
// If port was changed, update the reserved gRPC ports mapping
if *config.port != original && config.grpcPtr != nil && *config.grpcPtr == 0 {
delete(reservedPorts, original+GrpcPortOffset)
reservedPorts[*config.port+GrpcPortOffset] = true
httpPort := *config.port
grpcPort := *config.grpcPtr
// If gRPC port is 0, it will be auto-calculated later as httpPort + 10000
// Pre-calculate and check if that value would be available
if grpcPort == 0 {
calculatedGrpcPort := httpPort + 10000
// Check if the calculated port would be available (on both specific IP and all interfaces)
if !isPortOpenOnIP(bindIp, calculatedGrpcPort) || !isPortAvailable(calculatedGrpcPort) {
glog.Warningf("Calculated gRPC port %d for %s is not available, finding alternative port...", calculatedGrpcPort, config.name)
newPort := findAvailablePortOnIP(bindIp, calculatedGrpcPort+1, 100)
glog.Infof("gRPC port %d for %s is available, using it instead of calculated %d", newPort, config.name, calculatedGrpcPort)
*config.grpcPtr = newPort
}
} else {
// gRPC port was explicitly set by user, check if it's available (on both specific IP and all interfaces)
if !isPortOpenOnIP(bindIp, grpcPort) || !isPortAvailable(grpcPort) {
glog.Warningf("gRPC port %d for %s is not available, finding alternative port...", grpcPort, config.name)
newPort := findAvailablePortOnIP(bindIp, grpcPort+1, 100)
if newPort != grpcPort {
glog.Infof("gRPC port %d for %s is available, using it instead of %d", newPort, config.name, grpcPort)
*config.grpcPtr = newPort
}
}
}
}
// Initialize all gRPC ports before services start
// This ensures they won't be recalculated and cause conflicts
// All gRPC port handling (calculation, validation, and assignment) is performed exclusively in initializeGrpcPortsOnIP
initializeGrpcPortsOnIP(bindIp)
// Log the final port configuration
glog.Infof("Final port configuration - Master: %d, Filer: %d, Volume: %d, S3: %d, WebDAV: %d, Admin: %d",
*miniMasterOptions.port, *miniFilerOptions.port, *miniOptions.v.port,
*miniS3Options.port, *miniWebDavOptions.port, *miniAdminOptions.port)
// Log gRPC ports too (now finalized)
// Log gRPC ports too
glog.Infof("gRPC port configuration - Master: %d, Filer: %d, Volume: %d, S3: %d, Admin: %d",
*miniMasterOptions.portGrpc, *miniFilerOptions.portGrpc, *miniOptions.v.portGrpc,
*miniS3Options.portGrpc, *miniAdminOptions.grpcPort)
return nil
// Initialize all gRPC ports before services start
// This ensures they won't be recalculated and cause conflicts
initializeGrpcPortsOnIP(bindIp)
}
// initializeGrpcPortsOnIP initializes all gRPC ports based on their HTTP ports on a specific IP
// If a gRPC port is 0, it will be set to httpPort + GrpcPortOffset
// If a gRPC port is 0, it will be set to httpPort + 10000
// This must be called after HTTP ports are finalized and before services start
func initializeGrpcPortsOnIP(bindIp string) {
// Track gRPC ports allocated during this function to prevent collisions between services
// when multiple services need fallback port allocation
allocatedGrpcPorts := make(map[int]bool)
grpcConfigs := []struct {
httpPort *int
grpcPort *int
@ -519,36 +513,21 @@ func initializeGrpcPortsOnIP(bindIp string) {
// If gRPC port is 0, calculate it
if *config.grpcPort == 0 {
calculatedPort := *config.httpPort + GrpcPortOffset
// Check if calculated port is available (on both specific IP and all interfaces)
// Also check if it was already allocated to another service in this function
if !isPortOpenOnIP(bindIp, calculatedPort) || !isPortAvailable(calculatedPort) || allocatedGrpcPorts[calculatedPort] {
calculatedPort := *config.httpPort + 10000
// Double-check if calculated port is available, find alternative if not (check on both specific IP and all interfaces)
if !isPortOpenOnIP(bindIp, calculatedPort) || !isPortAvailable(calculatedPort) {
glog.Warningf("Calculated gRPC port %d for %s is not available, finding alternative...", calculatedPort, config.name)
newPort := findAvailablePortOnIP(bindIp, calculatedPort+1, 100, allocatedGrpcPorts)
if newPort == 0 {
glog.Errorf("Could not find available gRPC port for %s starting from %d, will use calculated %d and fail on binding", config.name, calculatedPort+1, calculatedPort)
} else {
calculatedPort = newPort
glog.Infof("gRPC port %d for %s is available, using it instead of calculated %d", newPort, config.name, *config.httpPort+GrpcPortOffset)
}
calculatedPort = findAvailablePortOnIP(bindIp, calculatedPort+1, 100)
}
*config.grpcPort = calculatedPort
allocatedGrpcPorts[calculatedPort] = true
glog.V(1).Infof("%s gRPC port initialized to %d", config.name, calculatedPort)
} else {
// gRPC port was explicitly set, verify it's still available (check on both specific IP and all interfaces)
// Also check if it was already allocated to another service in this function
if !isPortOpenOnIP(bindIp, *config.grpcPort) || !isPortAvailable(*config.grpcPort) || allocatedGrpcPorts[*config.grpcPort] {
if !isPortOpenOnIP(bindIp, *config.grpcPort) || !isPortAvailable(*config.grpcPort) {
glog.Warningf("Explicitly set gRPC port %d for %s is not available, finding alternative...", *config.grpcPort, config.name)
newPort := findAvailablePortOnIP(bindIp, *config.grpcPort+1, 100, allocatedGrpcPorts)
if newPort == 0 {
glog.Errorf("Could not find available gRPC port for %s starting from %d, will use original %d and fail on binding", config.name, *config.grpcPort+1, *config.grpcPort)
} else {
glog.Infof("gRPC port %d for %s is available, using it instead of %d", newPort, config.name, *config.grpcPort)
*config.grpcPort = newPort
}
newPort := findAvailablePortOnIP(bindIp, *config.grpcPort+1, 100)
*config.grpcPort = newPort
}
allocatedGrpcPorts[*config.grpcPort] = true
}
}
}
@ -607,11 +586,6 @@ func loadMiniConfigurationFile(dataFolder string) (map[string]string, error) {
// applyConfigFileOptions sets command-line flags from loaded configuration file
func applyConfigFileOptions(options map[string]string) {
for key, value := range options {
// Skip port flags that were explicitly passed on CLI
if explicitPortFlags[key] {
glog.V(2).Infof("Skipping config file option %s=%s (explicitly specified on command line)", key, value)
continue
}
// Set the flag value if it hasn't been explicitly set on command line
flag := cmdMini.Flag.Lookup(key)
if flag != nil {
@ -674,14 +648,6 @@ func saveMiniConfiguration(dataFolder string) error {
func runMini(cmd *Command, args []string) bool {
// Capture which port flags were explicitly passed on CLI BEFORE config file is applied
// This is necessary to distinguish user-specified ports from defaults or config file options
explicitPortFlags = make(map[string]bool)
portFlagNames := []string{"master.port", "filer.port", "volume.port", "s3.port", "webdav.port", "admin.port"}
for _, flagName := range portFlagNames {
explicitPortFlags[flagName] = isFlagPassed(flagName)
}
// Load configuration from file if it exists
configOptions, err := loadMiniConfigurationFile(*miniDataFolders)
if err != nil {
@ -699,14 +665,14 @@ func runMini(cmd *Command, args []string) bool {
grace.SetupProfiling(*miniOptions.cpuprofile, *miniOptions.memprofile)
// Determine bind IP
bindIp := getBindIp()
// Determine bind IP (same logic as below)
bindIp := *miniIp
if *miniBindIp != "" {
bindIp = *miniBindIp
}
// Ensure all ports are available, find alternatives if needed
if err := ensureAllPortsAvailableOnIP(bindIp); err != nil {
glog.Errorf("Port allocation failed: %v", err)
os.Exit(1)
}
ensureAllPortsAvailableOnIP(bindIp)
// Set master.peers to "none" if not specified (single master mode)
if *miniMasterOptions.peers == "" {
@ -801,16 +767,13 @@ func runMini(cmd *Command, args []string) bool {
// startMiniServices starts all mini services with proper dependency coordination
func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) {
// Determine bind IP for health checks
bindIp := getBindIp()
// Start Master server (no dependencies)
go startMiniService("Master", func() {
startMaster(miniMasterOptions, miniWhiteList)
}, *miniMasterOptions.port)
// Wait for master to be ready
waitForServiceReady("Master", *miniMasterOptions.port, bindIp)
waitForServiceReady("Master", *miniMasterOptions.port)
// Start Volume server (depends on master)
go startMiniService("Volume", func() {
@ -819,7 +782,7 @@ func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) {
}, *miniOptions.v.port)
// Wait for volume to be ready
waitForServiceReady("Volume", *miniOptions.v.port, bindIp)
waitForServiceReady("Volume", *miniOptions.v.port)
// Start Filer (depends on master and volume)
go startMiniService("Filer", func() {
@ -827,7 +790,7 @@ func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) {
}, *miniFilerOptions.port)
// Wait for filer to be ready
waitForServiceReady("Filer", *miniFilerOptions.port, bindIp)
waitForServiceReady("Filer", *miniFilerOptions.port)
// Start S3 and WebDAV in parallel (both depend on filer)
go startMiniService("S3", func() {
@ -839,8 +802,8 @@ func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) {
}, *miniWebDavOptions.port)
// Wait for both S3 and WebDAV to be ready
waitForServiceReady("S3", *miniS3Options.port, bindIp)
waitForServiceReady("WebDAV", *miniWebDavOptions.port, bindIp)
waitForServiceReady("S3", *miniS3Options.port)
waitForServiceReady("WebDAV", *miniWebDavOptions.port)
// Start Admin with worker (depends on master, filer, S3, WebDAV)
go startMiniAdminWithWorker(allServicesReady)
@ -853,8 +816,8 @@ func startMiniService(name string, fn func(), port int) {
}
// waitForServiceReady pings the service HTTP endpoint to check if it's ready to accept connections
func waitForServiceReady(name string, port int, bindIp string) {
address := fmt.Sprintf("http://%s:%d", bindIp, port)
func waitForServiceReady(name string, port int) {
address := fmt.Sprintf("http://127.0.0.1:%d", port)
maxAttempts := 30 // 30 * 200ms = 6 seconds max wait
attempt := 0
client := &http.Client{
@ -932,28 +895,11 @@ func startMiniAdminWithWorker(allServicesReady chan struct{}) {
// Set admin options
*miniAdminOptions.master = masterAddr
// gRPC port should have been initialized by ensureAllPortsAvailableOnIP in runMini
// If it's still 0, that indicates a problem with the port initialization sequence
// This defensive fallback handles edge cases where port initialization may have been skipped
// or failed silently (e.g., due to configuration changes or error handling paths)
// Note: gRPC port should already be initialized by ensureAllPortsAvailableOnIP
// only set it here if it's still 0 (shouldn't happen in normal flow)
if *miniAdminOptions.grpcPort == 0 {
glog.Warningf("Admin gRPC port was not initialized before startAdminServer, attempting fallback initialization...")
// Use the same availability checking logic as initializeGrpcPortsOnIP
calculatedPort := *miniAdminOptions.port + GrpcPortOffset
if !isPortOpenOnIP(getBindIp(), calculatedPort) || !isPortAvailable(calculatedPort) {
glog.Warningf("Calculated fallback gRPC port %d is not available, finding alternative...", calculatedPort)
newPort := findAvailablePortOnIP(getBindIp(), calculatedPort+1, 100, make(map[int]bool))
if newPort == 0 {
glog.Errorf("Could not find available gRPC port for Admin starting from %d, will use calculated %d and fail on binding", calculatedPort+1, calculatedPort)
*miniAdminOptions.grpcPort = calculatedPort
} else {
glog.Infof("Fallback: using gRPC port %d for Admin", newPort)
*miniAdminOptions.grpcPort = newPort
}
} else {
*miniAdminOptions.grpcPort = calculatedPort
glog.Infof("Fallback: Admin gRPC port initialized to %d", calculatedPort)
}
*miniAdminOptions.grpcPort = *miniAdminOptions.port + 10000
glog.V(1).Infof("Admin gRPC port was 0, calculated to %d", *miniAdminOptions.grpcPort)
}
// Create data directory if specified

Loading…
Cancel
Save