Browse Source

fix flags

pull/8388/head
Chris Lu 1 day ago
parent
commit
e9e7b29d2f
  1. 99
      weed/command/mini.go
  2. 1
      weed/util/fla9/fla9.go

99
weed/command/mini.go

@ -461,6 +461,7 @@ func ensurePortAvailableOnIP(portPtr *int, serviceName string, ip string, reserv
// Returns an error if an explicitly specified port is unavailable.
// This should be called before starting any services
func ensureAllPortsAvailableOnIP(bindIp string) error {
glog.V(1).Infof("ensureAllPortsAvailableOnIP started for %s", bindIp)
portConfigs := []struct {
port *int
name string
@ -530,7 +531,9 @@ func ensureAllPortsAvailableOnIP(bindIp string) error {
// Initialize all gRPC ports before services start
// This ensures they won't be recalculated and cause conflicts
// All gRPC port handling (calculation, validation, and assignment) is performed exclusively in initializeGrpcPortsOnIP
initializeGrpcPortsOnIP(bindIp)
if err := initializeGrpcPortsOnIP(bindIp); err != nil {
return err
}
// Log the final port configuration
icebergPortStr := "disabled"
@ -552,7 +555,7 @@ func ensureAllPortsAvailableOnIP(bindIp string) error {
// initializeGrpcPortsOnIP initializes all gRPC ports based on their HTTP ports on a specific IP
// If a gRPC port is 0, it will be set to httpPort + GrpcPortOffset
// This must be called after HTTP ports are finalized and before services start
func initializeGrpcPortsOnIP(bindIp string) {
func initializeGrpcPortsOnIP(bindIp string) error {
// Track gRPC ports allocated during this function to prevent collisions between services
// when multiple services need fallback port allocation
allocatedGrpcPorts := make(map[int]bool)
@ -581,9 +584,28 @@ func initializeGrpcPortsOnIP(bindIp string) {
for _, config := range grpcConfigs {
if config.grpcPort == nil {
glog.V(1).Infof("Skipping gRPC port initialization for %s (grpcPort is nil)", config.name)
continue
}
glog.V(1).Infof("Initializing gRPC port for %s: current value %d, http port %d", config.name, *config.grpcPort, *config.httpPort)
// Check if gRPC port was explicitly passed
grpcFlagName := ""
switch config.name {
case "Master":
grpcFlagName = "master.port.grpc"
case "Filer":
grpcFlagName = "filer.port.grpc"
case "Volume":
grpcFlagName = "volume.port.grpc"
case "S3":
grpcFlagName = "s3.port.grpc"
case "Admin":
grpcFlagName = "admin.port.grpc"
}
isExplicitlyPassed := grpcFlagName != "" && isFlagPassed(grpcFlagName)
// If gRPC port is 0, calculate it from HTTP port
if *config.grpcPort == 0 {
*config.grpcPort = *config.httpPort + GrpcPortOffset
@ -592,6 +614,9 @@ func initializeGrpcPortsOnIP(bindIp string) {
// Verify the gRPC port is available (whether calculated or explicitly set)
// Check on both specific IP and all interfaces, and check against already allocated ports
if !isPortOpenOnIP(bindIp, *config.grpcPort) || !isPortAvailable(*config.grpcPort) || allocatedGrpcPorts[*config.grpcPort] {
if isExplicitlyPassed {
return fmt.Errorf("explicitly specified gRPC port %d for %s is not available", *config.grpcPort, config.name)
}
glog.Warningf("gRPC port %d for %s is not available, finding alternative...", *config.grpcPort, config.name)
originalPort := *config.grpcPort
newPort := findAvailablePortOnIP(bindIp, originalPort+1, 100, allocatedGrpcPorts)
@ -603,8 +628,9 @@ func initializeGrpcPortsOnIP(bindIp string) {
}
}
allocatedGrpcPorts[*config.grpcPort] = true
glog.V(1).Infof("%s gRPC port set to %d", config.name, *config.grpcPort)
glog.Infof("%s gRPC port finalized to %d", config.name, *config.grpcPort)
}
return nil
}
// loadMiniConfigurationFile reads the mini.options file and returns parsed options
@ -728,10 +754,22 @@ func saveMiniConfiguration(dataFolder string) error {
func runMini(cmd *Command, args []string) bool {
glog.V(1).Infof("--- runMini started (IP: %s) ---", *miniIp)
glog.V(1).Infof("Initial ports - Master: %d, Filer: %d, Volume: %d, S3: %d, Admin: %d",
*miniMasterOptions.port, *miniFilerOptions.port, *miniOptions.v.port,
*miniS3Options.port, *miniAdminOptions.port)
glog.V(1).Infof("Initial gRPC ports - Master: %d, Filer: %d, Volume: %d, S3: %d, Admin: %d",
*miniMasterOptions.portGrpc, *miniFilerOptions.portGrpc, *miniOptions.v.portGrpc,
*miniS3Options.portGrpc, *miniAdminOptions.grpcPort)
// Capture which port flags were explicitly passed on CLI BEFORE config file is applied
// This is necessary to distinguish user-specified ports from defaults or config file options
explicitPortFlags = make(map[string]bool)
portFlagNames := []string{"master.port", "filer.port", "volume.port", "s3.port", "s3.port.iceberg", "webdav.port", "admin.port", "s3.iam.readOnly"}
portFlagNames := []string{
"master.port", "filer.port", "volume.port", "s3.port", "s3.port.iceberg", "webdav.port", "admin.port",
"master.port.grpc", "filer.port.grpc", "volume.port.grpc", "s3.port.grpc", "admin.port.grpc",
"s3.iam.readOnly",
}
for _, flagName := range portFlagNames {
explicitPortFlags[flagName] = isFlagPassed(flagName)
}
@ -759,7 +797,7 @@ func runMini(cmd *Command, args []string) bool {
// Ensure all ports are available, find alternatives if needed
if err := ensureAllPortsAvailableOnIP(bindIp); err != nil {
glog.Errorf("Port allocation failed: %v", err)
os.Exit(1)
return false
}
// Set master.peers to "none" if not specified (single master mode)
@ -860,6 +898,11 @@ func runMini(cmd *Command, args []string) bool {
// startMiniServices starts all mini services with proper dependency coordination
func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) {
ctx := MiniClusterCtx
if ctx == nil {
ctx = context.Background()
}
// Determine bind IP for health checks
bindIp := getBindIp()
@ -869,7 +912,9 @@ func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) {
}, *miniMasterOptions.port)
// Wait for master to be ready
waitForServiceReady("Master", *miniMasterOptions.port, bindIp)
if !waitForServiceReady(ctx, "Master", *miniMasterOptions.port, bindIp) {
return
}
// Start Volume server (depends on master)
go startMiniService("Volume", func() {
@ -878,7 +923,9 @@ func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) {
}, *miniOptions.v.port)
// Wait for volume to be ready
waitForServiceReady("Volume", *miniOptions.v.port, bindIp)
if !waitForServiceReady(ctx, "Volume", *miniOptions.v.port, bindIp) {
return
}
// Start Filer (depends on master and volume)
go startMiniService("Filer", func() {
@ -886,7 +933,9 @@ func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) {
}, *miniFilerOptions.port)
// Wait for filer to be ready
waitForServiceReady("Filer", *miniFilerOptions.port, bindIp)
if !waitForServiceReady(ctx, "Filer", *miniFilerOptions.port, bindIp) {
return
}
// Start S3 and WebDAV in parallel (both depend on filer)
if *miniEnableS3 {
@ -903,13 +952,19 @@ func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) {
// Wait for services to be ready
if *miniEnableS3 {
waitForServiceReady("S3", *miniS3Options.port, bindIp)
if !waitForServiceReady(ctx, "S3", *miniS3Options.port, bindIp) {
return
}
if miniS3Options.portIceberg != nil && *miniS3Options.portIceberg > 0 {
waitForServiceReady("Iceberg", *miniS3Options.portIceberg, bindIp)
if !waitForServiceReady(ctx, "Iceberg", *miniS3Options.portIceberg, bindIp) {
return
}
}
}
if *miniEnableWebDAV {
waitForServiceReady("WebDAV", *miniWebDavOptions.port, bindIp)
if !waitForServiceReady(ctx, "WebDAV", *miniWebDavOptions.port, bindIp) {
return
}
}
// Start Admin with worker (depends on master, filer, S3, WebDAV)
@ -923,7 +978,7 @@ func startMiniService(name string, fn func(), port int) {
}
// waitForServiceReady pings the service HTTP endpoint to check if it's ready to accept connections
func waitForServiceReady(name string, port int, bindIp string) {
func waitForServiceReady(ctx context.Context, name string, port int, bindIp string) bool {
address := fmt.Sprintf("http://%s:%d", bindIp, port)
healthAddr := getHealthCheckAddr(address)
maxAttempts := 30 // 30 * 200ms = 6 seconds max wait
@ -933,19 +988,30 @@ func waitForServiceReady(name string, port int, bindIp string) {
}
for attempt < maxAttempts {
select {
case <-ctx.Done():
return false
default:
}
resp, err := client.Get(healthAddr)
if err == nil {
resp.Body.Close()
glog.Infof("%s service is ready at %s", name, address)
return
return true
}
attempt++
time.Sleep(200 * time.Millisecond)
select {
case <-ctx.Done():
return false
case <-time.After(200 * time.Millisecond):
}
}
// Service failed to become ready, log warning but don't fail startup
// (services may still work even if health check endpoint isn't responding immediately)
glog.Warningf("Health check for %s failed (service may still be functional, retries may succeed)", name)
return true
}
// startS3Service initializes and starts the S3 server
@ -1002,6 +1068,11 @@ func startMiniAdminWithWorker(allServicesReady chan struct{}) {
// gRPC port should have been initialized by ensureAllPortsAvailableOnIP in runMini
// If it's still 0, that indicates a problem with the port initialization sequence
if *miniAdminOptions.grpcPort == 0 {
// If context is already cancelled, don't fatal - we're shutting down anyway
if ctx.Err() != nil {
glog.Infof("Admin gRPC port was 0, but context is cancelled. Skipping startup.")
return
}
glog.Fatalf("Admin gRPC port was not initialized before startAdminServer. This indicates a problem with the port initialization sequence.")
}

1
weed/util/fla9/fla9.go

@ -905,6 +905,7 @@ func (f *FlagSet) parseOne() (bool, error) {
// are defined and before flags are accessed by the program.
// The return value will be ErrHelp if -help or -h were set but not defined.
func (f *FlagSet) Parse(arguments []string) error {
f.actual = nil
if _, ok := f.formal[DefaultConfigFlagName]; !ok {
f.String(DefaultConfigFlagName, "", "a file of command line options, each line in optionName=optionValue format")
}

Loading…
Cancel
Save