diff --git a/test/s3/spark/setup_test.go b/test/s3/spark/setup_test.go index 79053a0ab..8be585675 100644 --- a/test/s3/spark/setup_test.go +++ b/test/s3/spark/setup_test.go @@ -4,8 +4,6 @@ import ( "context" "fmt" "io" - "math/rand" - "net" "os" "os/exec" "path/filepath" @@ -96,9 +94,9 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { t.Fatalf("failed to create temp directory: %v", err) } - env.masterPort = mustFreePort(t, "Master") - env.filerPort = mustFreePort(t, "Filer") - env.s3Port = mustFreePort(t, "S3") + env.masterPort = testutil.MustFreeMiniPort(t, "Master") + env.filerPort = testutil.MustFreeMiniPort(t, "Filer") + env.s3Port = testutil.MustFreeMiniPort(t, "S3") bindIP := testutil.FindBindIP() iamConfigPath, err := testutil.WriteIAMConfig(env.seaweedfsDataDir, env.accessKey, env.secretKey) @@ -135,14 +133,14 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { } registerMiniProcess(env.masterProcess) - if !waitForPort(env.masterPort, 15*time.Second) { + if !testutil.WaitForPort(env.masterPort, testutil.SeaweedMiniStartupTimeout) { t.Fatalf("weed mini failed to start - master port %d not listening", env.masterPort) } - if !waitForPort(env.filerPort, 15*time.Second) { + if !testutil.WaitForPort(env.filerPort, testutil.SeaweedMiniStartupTimeout) { t.Fatalf("weed mini failed to start - filer port %d not listening", env.filerPort) } - if !waitForPort(env.s3Port, 15*time.Second) { - t.Fatalf("weed mini failed to start - s3 port %d not listening", env.s3Port) + if !testutil.WaitForService(fmt.Sprintf("http://127.0.0.1:%d/status", env.s3Port), testutil.SeaweedMiniStartupTimeout) { + t.Fatalf("weed mini failed to start - s3 endpoint http://127.0.0.1:%d/status not responding", env.s3Port) } } @@ -208,46 +206,6 @@ func (env *TestEnvironment) Cleanup(t *testing.T) { } } -func mustFreePort(t *testing.T, name string) int { - t.Helper() - - for i := 0; i < 200; i++ { - port := 20000 + rand.Intn(30000) - listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port)) - if err != nil { - continue - } - _ = listener.Close() - - grpcPort := port + 10000 - if grpcPort > 65535 { - continue - } - grpcListener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", grpcPort)) - if err != nil { - continue - } - _ = grpcListener.Close() - return port - } - - t.Fatalf("failed to get free port for %s", name) - return 0 -} - -func waitForPort(port int, timeout time.Duration) bool { - deadline := time.Now().Add(timeout) - for time.Now().Before(deadline) { - conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", port), 500*time.Millisecond) - if err == nil { - _ = conn.Close() - return true - } - time.Sleep(100 * time.Millisecond) - } - return false -} - func runSparkPyScript(t *testing.T, container testcontainers.Container, script string, s3Port int) (int, string) { t.Helper() diff --git a/test/s3tables/catalog_risingwave/setup_test.go b/test/s3tables/catalog_risingwave/setup_test.go index 47a4bbf89..58c6ed757 100644 --- a/test/s3tables/catalog_risingwave/setup_test.go +++ b/test/s3tables/catalog_risingwave/setup_test.go @@ -7,7 +7,6 @@ import ( "fmt" "io" "math/rand" - "net" "net/http" "os" "os/exec" @@ -115,11 +114,11 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { t.Fatalf("failed to create temp directory: %v", err) } - env.masterPort = mustFreePort(t, "Master") - env.filerPort = mustFreePort(t, "Filer") - env.s3Port = mustFreePort(t, "S3") - env.icebergRestPort = mustFreePort(t, "Iceberg") - env.risingwavePort = mustFreePort(t, "RisingWave") + env.masterPort = testutil.MustFreeMiniPort(t, "Master") + env.filerPort = testutil.MustFreeMiniPort(t, "Filer") + env.s3Port = testutil.MustFreeMiniPort(t, "S3") + env.icebergRestPort = testutil.MustFreeMiniPort(t, "Iceberg") + env.risingwavePort = testutil.MustFreeMiniPort(t, "RisingWave") env.bindIP = testutil.FindBindIP() @@ -161,62 +160,20 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { registerMiniProcess(env.masterProcess) // Wait for all services to be ready - if !waitForPort(env.masterPort, 15*time.Second) { + if !testutil.WaitForPort(env.masterPort, testutil.SeaweedMiniStartupTimeout) { t.Fatalf("weed mini failed to start - master port %d not listening", env.masterPort) } - if !waitForPort(env.filerPort, 15*time.Second) { + if !testutil.WaitForPort(env.filerPort, testutil.SeaweedMiniStartupTimeout) { t.Fatalf("weed mini failed to start - filer port %d not listening", env.filerPort) } - if !waitForPort(env.s3Port, 15*time.Second) { - t.Fatalf("weed mini failed to start - s3 port %d not listening", env.s3Port) + if !testutil.WaitForService(fmt.Sprintf("http://127.0.0.1:%d/status", env.s3Port), testutil.SeaweedMiniStartupTimeout) { + t.Fatalf("weed mini failed to start - s3 endpoint http://127.0.0.1:%d/status not responding", env.s3Port) } - if !waitForPort(env.icebergRestPort, 15*time.Second) { - t.Fatalf("weed mini failed to start - iceberg rest port %d not listening", env.icebergRestPort) + if !testutil.WaitForService(fmt.Sprintf("http://127.0.0.1:%d/v1/config", env.icebergRestPort), testutil.SeaweedMiniStartupTimeout) { + t.Fatalf("weed mini failed to start - iceberg rest endpoint http://127.0.0.1:%d/v1/config not responding", env.icebergRestPort) } } -func mustFreePort(t *testing.T, name string) int { - t.Helper() - minPort := 10000 - maxPort := 55000 // Ensure port+10000 < 65535 - r := rand.New(rand.NewSource(time.Now().UnixNano())) - - for i := 0; i < 1000; i++ { - port := minPort + r.Intn(maxPort-minPort) - - // Check http port - ln, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) - if err != nil { - continue - } - ln.Close() - - // Check grpc port (weed mini uses port+10000) - ln2, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port+10000)) - if err != nil { - continue - } - ln2.Close() - - return port - } - t.Fatalf("failed to find a free port < %d for %s after 1000 attempts", maxPort, name) - return 0 -} - -func waitForPort(port int, timeout time.Duration) bool { - deadline := time.Now().Add(timeout) - for time.Now().Before(deadline) { - conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", port), 500*time.Millisecond) - if err == nil { - conn.Close() - return true - } - time.Sleep(100 * time.Millisecond) - } - return false -} - func (env *TestEnvironment) StartRisingWave(t *testing.T) { t.Helper() @@ -253,7 +210,7 @@ func (env *TestEnvironment) StartRisingWave(t *testing.T) { } // Wait for RisingWave port to be open on host - if !waitForPort(env.risingwavePort, 120*time.Second) { + if !testutil.WaitForPort(env.risingwavePort, 120*time.Second) { t.Fatalf("timed out waiting for RisingWave port %d to be open", env.risingwavePort) } diff --git a/test/s3tables/catalog_spark/setup_test.go b/test/s3tables/catalog_spark/setup_test.go index 50f3017a7..1b1e323c2 100644 --- a/test/s3tables/catalog_spark/setup_test.go +++ b/test/s3tables/catalog_spark/setup_test.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "math/rand" - "net" "os" "os/exec" "path/filepath" @@ -91,10 +90,10 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { t.Fatalf("failed to create temp directory: %v", err) } - env.masterPort = mustFreePort(t, "Master") - env.filerPort = mustFreePort(t, "Filer") - env.s3Port = mustFreePort(t, "S3") - env.icebergRestPort = mustFreePort(t, "Iceberg") + env.masterPort = testutil.MustFreeMiniPort(t, "Master") + env.filerPort = testutil.MustFreeMiniPort(t, "Filer") + env.s3Port = testutil.MustFreeMiniPort(t, "S3") + env.icebergRestPort = testutil.MustFreeMiniPort(t, "Iceberg") bindIP := testutil.FindBindIP() @@ -127,58 +126,20 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { registerMiniProcess(env.masterProcess) // Wait for all services to be ready - if !waitForPort(env.masterPort, 15*time.Second) { + if !testutil.WaitForPort(env.masterPort, testutil.SeaweedMiniStartupTimeout) { t.Fatalf("weed mini failed to start - master port %d not listening", env.masterPort) } - if !waitForPort(env.filerPort, 15*time.Second) { + if !testutil.WaitForPort(env.filerPort, testutil.SeaweedMiniStartupTimeout) { t.Fatalf("weed mini failed to start - filer port %d not listening", env.filerPort) } - if !waitForPort(env.s3Port, 15*time.Second) { - t.Fatalf("weed mini failed to start - s3 port %d not listening", env.s3Port) + if !testutil.WaitForService(fmt.Sprintf("http://127.0.0.1:%d/status", env.s3Port), testutil.SeaweedMiniStartupTimeout) { + t.Fatalf("weed mini failed to start - s3 endpoint http://127.0.0.1:%d/status not responding", env.s3Port) } - if !waitForPort(env.icebergRestPort, 15*time.Second) { - t.Fatalf("weed mini failed to start - iceberg rest port %d not listening", env.icebergRestPort) + if !testutil.WaitForService(fmt.Sprintf("http://127.0.0.1:%d/v1/config", env.icebergRestPort), testutil.SeaweedMiniStartupTimeout) { + t.Fatalf("weed mini failed to start - iceberg rest endpoint http://127.0.0.1:%d/v1/config not responding", env.icebergRestPort) } } -func mustFreePort(t *testing.T, name string) int { - t.Helper() - - for i := 0; i < 200; i++ { - port := 20000 + rand.Intn(30000) - listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port)) - if err != nil { - continue - } - listener.Close() - grpcPort := port + 10000 - if grpcPort > 65535 { - continue - } - grpcListener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", grpcPort)) - if err != nil { - continue - } - grpcListener.Close() - return port - } - t.Fatalf("failed to get free port for %s", name) - return 0 -} - -func waitForPort(port int, timeout time.Duration) bool { - deadline := time.Now().Add(timeout) - for time.Now().Before(deadline) { - conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", port), 500*time.Millisecond) - if err == nil { - conn.Close() - return true - } - time.Sleep(100 * time.Millisecond) - } - return false -} - func (env *TestEnvironment) writeSparkConfig(t *testing.T, catalogBucket string) string { t.Helper() diff --git a/test/s3tables/testutil/docker.go b/test/s3tables/testutil/docker.go index eff2b2ac1..a92472642 100644 --- a/test/s3tables/testutil/docker.go +++ b/test/s3tables/testutil/docker.go @@ -2,6 +2,7 @@ package testutil import ( "context" + "fmt" "net" "net/http" "os/exec" @@ -64,3 +65,17 @@ func WaitForService(url string, timeout time.Duration) bool { } } } + +func WaitForPort(port int, timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + address := fmt.Sprintf("127.0.0.1:%d", port) + for time.Now().Before(deadline) { + conn, err := net.DialTimeout("tcp", address, 500*time.Millisecond) + if err == nil { + _ = conn.Close() + return true + } + time.Sleep(100 * time.Millisecond) + } + return false +} diff --git a/test/s3tables/testutil/weed_mini.go b/test/s3tables/testutil/weed_mini.go index ff8260ef0..1c661b698 100644 --- a/test/s3tables/testutil/weed_mini.go +++ b/test/s3tables/testutil/weed_mini.go @@ -2,11 +2,16 @@ package testutil import ( "fmt" + "math/rand" "net" "os" "path/filepath" + "testing" + "time" ) +const SeaweedMiniStartupTimeout = 45 * time.Second + func FindBindIP() string { addrs, err := net.InterfaceAddrs() if err != nil { @@ -54,3 +59,35 @@ func WriteIAMConfig(dir, accessKey, secretKey string) (string, error) { } return iamConfigPath, nil } + +func MustFreeMiniPort(t *testing.T, name string) int { + t.Helper() + + const ( + minPort = 10000 + maxPort = 55000 + ) + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + for i := 0; i < 1000; i++ { + port := minPort + r.Intn(maxPort-minPort) + + listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) + if err != nil { + continue + } + _ = listener.Close() + + grpcPort := port + 10000 + grpcListener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", grpcPort)) + if err != nil { + continue + } + _ = grpcListener.Close() + + return port + } + + t.Fatalf("failed to get free weed mini port for %s", name) + return 0 +}