diff --git a/test/s3tables/catalog/test_rest_catalog.py b/test/s3tables/catalog/test_rest_catalog.py index b07ad30e4..117af36df 100644 --- a/test/s3tables/catalog/test_rest_catalog.py +++ b/test/s3tables/catalog/test_rest_catalog.py @@ -188,16 +188,30 @@ def main(): print(f"Prefix: {args.prefix}") print() - # Load the REST catalog - catalog = load_catalog( - "rest", - **{ - "type": "rest", - "uri": args.catalog_url, - "warehouse": args.warehouse, - "prefix": args.prefix, - } - ) + # Load the REST catalog with retries to handle possible delay in catalog server readiness + import time + max_retries = 10 + catalog = None + for attempt in range(max_retries): + try: + catalog = load_catalog( + "rest", + **{ + "type": "rest", + "uri": args.catalog_url, + "warehouse": args.warehouse, + "prefix": args.prefix, + } + ) + print(f"Successfully connected to catalog on attempt {attempt + 1}") + break + except Exception as e: + if attempt < max_retries - 1: + print(f" Attempt {attempt + 1} failed, retrying in 2s... ({e})") + time.sleep(2) + else: + print(f" All {max_retries} attempts failed.") + raise e # Run tests tests = [ diff --git a/weed/command/mini.go b/weed/command/mini.go index 48308cc9b..956872de6 100644 --- a/weed/command/mini.go +++ b/weed/command/mini.go @@ -888,9 +888,12 @@ func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) { }, *miniWebDavOptions.port) } - // Wait for both S3 and WebDAV to be ready + // Wait for services to be ready if *miniEnableS3 { waitForServiceReady("S3", *miniS3Options.port, bindIp) + if miniS3Options.portIceberg != nil && *miniS3Options.portIceberg > 0 { + waitForServiceReady("Iceberg", *miniS3Options.portIceberg, bindIp) + } } if *miniEnableWebDAV { waitForServiceReady("WebDAV", *miniWebDavOptions.port, bindIp) @@ -909,6 +912,7 @@ func startMiniService(name string, fn func(), port int) { // waitForServiceReady pings the service HTTP endpoint to check if it's ready to accept connections func waitForServiceReady(name string, port int, bindIp string) { address := fmt.Sprintf("http://%s:%d", bindIp, port) + healthAddr := getHealthCheckAddr(address) maxAttempts := 30 // 30 * 200ms = 6 seconds max wait attempt := 0 client := &http.Client{ @@ -916,7 +920,7 @@ func waitForServiceReady(name string, port int, bindIp string) { } for attempt < maxAttempts { - resp, err := client.Get(address) + resp, err := client.Get(healthAddr) if err == nil { resp.Body.Close() glog.Infof("%s service is ready at %s", name, address) @@ -1022,7 +1026,7 @@ func startMiniAdminWithWorker(allServicesReady chan struct{}) { // waitForAdminServerReady pings the admin server HTTP endpoint to check if it's ready func waitForAdminServerReady(adminAddr string) error { - healthAddr := fmt.Sprintf("%s/health", adminAddr) + healthAddr := getHealthCheckAddr(fmt.Sprintf("%s/health", adminAddr)) maxAttempts := 60 // 60 * 500ms = 30 seconds max wait attempt := 0 client := &http.Client{ @@ -1033,7 +1037,7 @@ func waitForAdminServerReady(adminAddr string) error { resp, err := client.Get(healthAddr) if err == nil { resp.Body.Close() - glog.V(1).Infof("Admin server is ready at %s", adminAddr) + glog.Infof("Admin server is ready at %s", adminAddr) return nil } attempt++ @@ -1042,6 +1046,12 @@ func waitForAdminServerReady(adminAddr string) error { return fmt.Errorf("admin server did not become ready at %s after %d attempts", adminAddr, maxAttempts) } +func getHealthCheckAddr(addr string) string { + if strings.Contains(addr, "://0.0.0.0:") { + return strings.Replace(addr, "://0.0.0.0:", "://127.0.0.1:", 1) + } + return addr +} // waitForWorkerReady polls the worker's gRPC port to ensure the worker has fully initialized func waitForWorkerReady(workerGrpcAddr string) {