From 0afc675a556e4ddb6f13f719b0a739fc0f3bbd53 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 15 Mar 2026 17:45:55 -0700 Subject: [PATCH] iceberg: validate filer failover targets (#8637) * iceberg: validate filer failover targets * iceberg: tighten filer liveness checks * iceberg: relax filer test readiness deadline --- weed/plugin/worker/iceberg/exec_test.go | 74 ++++++++++++++++++++++++- weed/plugin/worker/iceberg/handler.go | 37 ++++++++++--- 2 files changed, 102 insertions(+), 9 deletions(-) diff --git a/weed/plugin/worker/iceberg/exec_test.go b/weed/plugin/worker/iceberg/exec_test.go index 32789df88..963ac5248 100644 --- a/weed/plugin/worker/iceberg/exec_test.go +++ b/weed/plugin/worker/iceberg/exec_test.go @@ -153,8 +153,23 @@ func (f *fakeFilerServer) DeleteEntry(_ context.Context, req *filer_pb.DeleteEnt return &filer_pb.DeleteEntryResponse{}, nil } +func (f *fakeFilerServer) Ping(_ context.Context, _ *filer_pb.PingRequest) (*filer_pb.PingResponse, error) { + now := time.Now().UnixNano() + return &filer_pb.PingResponse{ + StartTimeNs: now, + RemoteTimeNs: now, + StopTimeNs: now, + }, nil +} + // startFakeFiler starts a gRPC server and returns a connected client. func startFakeFiler(t *testing.T) (*fakeFilerServer, filer_pb.SeaweedFilerClient) { + t.Helper() + fakeServer, client, _ := startFakeFilerWithAddress(t) + return fakeServer, client +} + +func startFakeFilerWithAddress(t *testing.T) (*fakeFilerServer, filer_pb.SeaweedFilerClient, string) { t.Helper() fakeServer := newFakeFilerServer() @@ -175,7 +190,26 @@ func startFakeFiler(t *testing.T) (*fakeFilerServer, filer_pb.SeaweedFilerClient } t.Cleanup(func() { conn.Close() }) - return fakeServer, filer_pb.NewSeaweedFilerClient(conn) + client := filer_pb.NewSeaweedFilerClient(conn) + deadline := time.Now().Add(5 * time.Second) + for { + pingCtx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + _, err := client.Ping(pingCtx, &filer_pb.PingRequest{}) + cancel() + if err == nil { + break + } + if time.Now().After(deadline) { + t.Fatalf("filer not ready: %v", err) + } + code := status.Code(err) + if code != codes.Unavailable && code != codes.DeadlineExceeded && code != codes.Canceled { + t.Fatalf("unexpected filer readiness error: %v", err) + } + time.Sleep(10 * time.Millisecond) + } + + return fakeServer, client, listener.Addr().String() } // --------------------------------------------------------------------------- @@ -860,6 +894,44 @@ func TestDetectWithFilters(t *testing.T) { } } +func TestConnectToFilerSkipsUnreachableAddresses(t *testing.T) { + handler := NewHandler(grpc.WithTransportCredentials(insecure.NewCredentials())) + _, _, liveAddr := startFakeFilerWithAddress(t) + + deadListener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen for dead address: %v", err) + } + deadAddr := deadListener.Addr().String() + _ = deadListener.Close() + + addr, conn, err := handler.connectToFiler(context.Background(), []string{deadAddr, liveAddr}) + if err != nil { + t.Fatalf("connectToFiler failed: %v", err) + } + defer conn.Close() + + if addr != liveAddr { + t.Fatalf("expected live address %q, got %q", liveAddr, addr) + } +} + +func TestConnectToFilerFailsWhenAllAddressesAreUnreachable(t *testing.T) { + handler := NewHandler(grpc.WithTransportCredentials(insecure.NewCredentials())) + + deadListener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen for dead address: %v", err) + } + deadAddr := deadListener.Addr().String() + _ = deadListener.Close() + + _, _, err = handler.connectToFiler(context.Background(), []string{deadAddr}) + if err == nil { + t.Fatal("expected connectToFiler to fail") + } +} + func TestStalePlanGuard(t *testing.T) { fs, client := startFakeFiler(t) diff --git a/weed/plugin/worker/iceberg/handler.go b/weed/plugin/worker/iceberg/handler.go index b43354892..37cbf6860 100644 --- a/weed/plugin/worker/iceberg/handler.go +++ b/weed/plugin/worker/iceberg/handler.go @@ -5,8 +5,10 @@ import ( "fmt" "path" "strings" + "time" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker" @@ -31,6 +33,8 @@ type Handler struct { grpcDialOption grpc.DialOption } +const filerConnectTimeout = 5 * time.Second + // NewHandler creates a new handler for iceberg table maintenance. func NewHandler(grpcDialOption grpc.DialOption) *Handler { return &Handler{grpcDialOption: grpcDialOption} @@ -205,7 +209,7 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor { }, }, DefaultValues: map[string]*plugin_pb.ConfigValue{ - "target_file_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultTargetFileSizeMB}}, + "target_file_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultTargetFileSizeMB}}, "min_input_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMinInputFiles}}, "min_manifests_to_rewrite": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMinManifestsToRewrite}}, "snapshot_retention_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultSnapshotRetentionHours}}, @@ -227,7 +231,7 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor { JobTypeMaxRuntimeSeconds: 3600, // 1 hour max }, WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{ - "target_file_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultTargetFileSizeMB}}, + "target_file_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultTargetFileSizeMB}}, "min_input_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMinInputFiles}}, "snapshot_retention_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultSnapshotRetentionHours}}, "max_snapshots_to_keep": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxSnapshotsToKeep}}, @@ -272,7 +276,7 @@ func (h *Handler) Detect(ctx context.Context, request *plugin_pb.RunDetectionReq tableFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "table_filter", "")) // Connect to filer — try each address until one succeeds. - filerAddress, conn, err := h.connectToFiler(filerAddresses) + filerAddress, conn, err := h.connectToFiler(ctx, filerAddresses) if err != nil { return fmt.Errorf("connect to filer: %w", err) } @@ -382,7 +386,7 @@ func (h *Handler) Execute(ctx context.Context, request *plugin_pb.ExecuteJobRequ } // Connect to filer - conn, err := grpc.NewClient(filerAddress, h.grpcDialOption) + conn, err := h.dialFiler(ctx, filerAddress) if err != nil { return fmt.Errorf("connect to filer %s: %w", filerAddress, err) } @@ -488,13 +492,30 @@ func (h *Handler) sendEmptyDetection(sender pluginworker.DetectionSender) error }) } +func (h *Handler) dialFiler(ctx context.Context, address string) (*grpc.ClientConn, error) { + opCtx, opCancel := context.WithTimeout(ctx, filerConnectTimeout) + defer opCancel() + + conn, err := pb.GrpcDial(opCtx, address, false, h.grpcDialOption) + if err != nil { + return nil, err + } + + client := filer_pb.NewSeaweedFilerClient(conn) + if _, err := client.Ping(opCtx, &filer_pb.PingRequest{}); err != nil { + _ = conn.Close() + return nil, err + } + + return conn, nil +} + // connectToFiler tries each filer address in order and returns the first -// successful gRPC connection. If all addresses fail, it returns a -// consolidated error. -func (h *Handler) connectToFiler(addresses []string) (string, *grpc.ClientConn, error) { +// address whose gRPC connection and Ping request succeed. +func (h *Handler) connectToFiler(ctx context.Context, addresses []string) (string, *grpc.ClientConn, error) { var lastErr error for _, addr := range addresses { - conn, err := grpc.NewClient(addr, h.grpcDialOption) + conn, err := h.dialFiler(ctx, addr) if err != nil { lastErr = fmt.Errorf("filer %s: %w", addr, err) continue