From 2ec0a67ee3d7e6c9c3e78ca941a73c2c4492a33e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 8 Mar 2026 16:05:45 -0700 Subject: [PATCH] master: return 503/Unavailable during topology warmup after leader change (#8529) * master: return 503/Unavailable during topology warmup after leader change After a master restart or leader change, the topology is empty until volume servers reconnect and send heartbeats. During this warmup window (3 heartbeat intervals = 15 seconds), volume lookups that fail now return 503 Service Unavailable (HTTP) or gRPC Unavailable instead of 404 Not Found, signaling clients to retry with other masters. * master: skip warmup 503 on fresh start and single-master setups - Check MaxVolumeId > 0 to distinguish restart from fresh start (MaxVolumeId is Raft-persisted, so 0 means no prior data) - Check peer count > 1 so single-master deployments aren't affected (no point suggesting "retry with other masters" if there are none) * master: address review feedback and block assigns during warmup - Protect LastLeaderChangeTime with dedicated mutex (fix data race) - Extract warmup multiplier as WarmupPulseMultiplier constant - Derive Retry-After header from pulse config instead of hardcoding - Only trigger warmup 503 for "not found" errors, not parse errors - Return nil response (not partial) on gRPC Unavailable - Add doc comments to IsWarmingUp, getter/setter, WarmupDuration - Block volume assign requests (HTTP and gRPC) during warmup, since the topology is incomplete and assignments would be unreliable - Skip warmup behavior for single-master setups (no peers to retry) * master: apply warmup to all setups, skip only on fresh start Single-master restarts still have an empty topology until heartbeats arrive, so warmup protection should apply there too. The only case to skip is a fresh cluster start (MaxVolumeId == 0), which already has no volumes to look up. - Remove GetMasterCount() > 1 guard from all warmup checks - Remove now-unused GetMasterCount helper - Update error messages to "topology is still loading" (not "retry with other masters" which doesn't apply to single-master) * master: add client-side retry on Unavailable for lookup and assign The server-side 503/Unavailable during warmup needs client cooperation. Previously, LookupVolumeIds and Assign would immediately propagate the error without retry. Now both paths retry with exponential backoff (1s -> 1.5s -> ... up to 6s) when receiving Unavailable, respecting context cancellation. This covers the warmup window where the master's topology is still loading after a restart or leader change. * master: seed warmup timestamp in legacy raft path at setup The legacy raft path only set lastLeaderChangeTime inside the event listener callback, which could fire after IsLeader() was already observed as true in SetRaftServer. Seed the timestamp at setup time (matching the hashicorp path) so IsWarmingUp() is active immediately. * master: fix assign retry loop to cover full warmup window The retry loop used waitTime <= maxWaitTime as a stop condition, causing it to give up after ~13s while warmup lasts 15s. Now cap each individual sleep at maxWaitTime but keep retrying until the context is cancelled. * master: preserve gRPC status in lookup retry and fix retry window Return the raw gRPC error instead of wrapping with fmt.Errorf so status.FromError() can extract the status code. Use proper gRPC status check (codes.Unavailable) instead of string matching. Also cap individual sleep at maxWaitTime while retrying until ctx is done. * master: use gRPC status code instead of string matching in assign retry Use status.FromError/codes.Unavailable instead of brittle strings.Contains for detecting retriable gRPC errors in the assign retry loop. * master: use remaining warmup duration for Retry-After header Set Retry-After to the remaining warmup time instead of the full warmup duration, so clients don't wait longer than necessary. * master: reset ret.Replicas before populating from assign response Clear Replicas slice before appending to prevent duplicate entries when the assign response is retried or when alternative requests are attempted. * master: add unit tests for warmup retry behavior Test that Assign() and LookupVolumeIds() retry on codes.Unavailable and stop promptly when the context is cancelled. * master: record leader change time before initialization work Move SetLastLeaderChangeTime() to fire immediately when the leader change event is received, before DoBarrier(), EnsureTopologyId(), and updatePeers(), so the warmup clock starts at the true moment of leadership transition. * master: use topology warmup duration in volume growth wait loop Replace hardcoded constants.VolumePulsePeriod * 2 with topo.IsWarmingUp() and topo.WarmupDuration() so the growth wait stays in sync with the configured warmup window. Remove unused constants import. * master: resolve master before creating RPC timeout context Move GetMaster() call before context.WithTimeout() so master resolution blocking doesn't consume the gRPC call timeout. * master: use NotFound flag instead of string matching for volume lookup Add a NotFound field to LookupResult and set it in findVolumeLocation when a volume is genuinely missing. Update HTTP and gRPC warmup checks to use this flag instead of strings.Contains on the error message. * master: bound assign retry loop to 30s for deadline-free contexts Without a context deadline, the Unavailable retry loop could spin forever. Add a maxRetryDuration of 30s so the loop gives up even when no context deadline is set. * master: strengthen assign retry cancellation test Verify the retry loop actually retried (callCount > 1) and that the returned error is context.DeadlineExceeded, not just any error. * master: extract shared retry-with-backoff utility Add util.RetryWithBackoff for context-aware, bounded retry with exponential backoff. Refactor both Assign() and LookupVolumeIds() to use it instead of duplicating the retry/sleep/backoff logic. * master: cap waitTime in RetryWithBackoff to prevent unbounded growth Cap the backoff waitTime at maxWaitTime so it doesn't grow indefinitely in long-running retry scenarios. * master: only return Unavailable during warmup when all lookups failed For batched LookupVolume requests, return partial results when some volumes are found. Only return codes.Unavailable when no volumes were successfully resolved, so clients benefit from partial results instead of retrying unnecessarily. * master: set retriable error message in 503 response body When returning 503 during warmup, replace the "not found" error in the JSON body with "service warming up, please retry" so clients don't treat it as a permanent error. * master: guard empty master address in LookupVolumeIds If GetMaster() returns empty (no master found or ctx cancelled), return an appropriate error instead of dialing an empty address. Returns ctx.Err() if context is done, otherwise codes.Unavailable to trigger retry. * master: add comprehensive tests for RetryWithBackoff Test success after retries, non-retryable error handling, context cancellation, and maxDuration cap with context.Background(). * master: enforce hard maxDuration bound in RetryWithBackoff Use a deadline instead of elapsed-time check so the last sleep is capped to remaining time. This prevents the total retry duration from overshooting maxDuration by up to one full backoff interval. * master: respect fresh-start bypass in RemainingWarmupDuration Check IsWarmingUp() first (which returns false when MaxVolumeId==0) so RemainingWarmupDuration returns 0 on fresh clusters. * master: round up Retry-After seconds to avoid underestimating Use math.Ceil so fractional remaining seconds (e.g. 1.9s) round up to the next integer (2) instead of flooring down (1). * master: tighten batch lookup warmup to all-NotFound only Only return codes.Unavailable when every requested volume ID was a transient not-found. Mixed cases with non-NotFound errors now return the response with per-volume error details preserved. * master: reduce retry log noise and fix timer leak Lower per-attempt retry log from V(0) to V(1) to reduce noise during warmup. Replace time.After with time.NewTimer to avoid lingering timers when context is cancelled. * master: add per-attempt timeout for assign RPC Use a 10s per-attempt timeout so a single slow RPC can't consume the entire 30s retry budget when ctx has no deadline. * master: share single 30s retry deadline across assign request entries The Assign() function iterates over primary and fallback requests, previously giving each its own 30s RetryWithBackoff budget. With a primary + fallback, the total could reach 60s. Compute one deadline up front and pass the remaining budget to each RetryWithBackoff call so the entire Assign() call stays within a single 30s cap. * master: strengthen context-cancel test with DeadlineExceeded and retry assertions Assert errors.Is(err, context.DeadlineExceeded) to verify the error is specifically from the context deadline, and check callCount > 1 to prove retries actually occurred before cancellation. Mirrors the pattern used in TestAssignStopsOnContextCancel. * master: bound GetMaster with per-attempt timeout in LookupVolumeIds GetMaster() calls WaitUntilConnected() which can block indefinitely if no master is available. Previously it used the outer ctx, so a slow master resolution could consume the entire RetryWithBackoff budget in a single attempt. Move the per-attempt timeoutCtx creation before the GetMaster call so both master resolution and the gRPC LookupVolume RPC share one grpcTimeout-bounded attempt. * master: use deadline-aware context for assign retry budget The shared 30s deadline only limited RetryWithBackoff's internal wall-clock tracking, but per-attempt contexts were still derived from the original ctx and could run for up to 10s even when the budget was nearly exhausted. Create a deadlineCtx from the computed deadline and derive both RetryWithBackoff and per-attempt timeouts from it so all operations honor the shared 30s cap. * master: skip warmup gate for empty lookup requests When VolumeOrFileIds is empty, notFoundCount == len(req.VolumeOrFileIds) is 0 == 0 which is true, causing empty lookup batches during warmup to return codes.Unavailable and be retried endlessly. Add a len(req.VolumeOrFileIds) > 0 guard so empty requests pass through. * master: validate request fields before warmup gate in Assign Move Replication and Ttl parsing before the IsWarmingUp() check so invalid inputs get a proper validation error instead of being masked by codes.Unavailable during warmup. Pure syntactic validation does not depend on topology state and should run first. * master: check deadline and context before starting retry attempt RetryWithBackoff only checked the deadline and context after an attempt completed or during the sleep select. If the deadline expired or context was canceled during sleep, the next iteration would still call operation() before detecting it. Add pre-operation checks so no new attempt starts after the budget is exhausted. * master: always return ctx.Err() on context cancellation in RetryWithBackoff When ctx.Err() is non-nil, the pre-operation check was returning lastErr instead of ctx.Err(). This broke callers checking errors.Is(err, context.DeadlineExceeded) and contradicted the documented contract. Always return ctx.Err() so the cancellation reason is properly surfaced. * master: handle warmup errors in StreamAssign without killing the stream StreamAssign was returning codes.Unavailable errors from Assign directly, which terminates the gRPC stream and breaks pooled connections. Instead, return transient errors as in-band error responses so the stream survives warmup periods. Also reset assignClient in doAssign on Send/Recv failures so a broken stream doesn't leave the proxy permanently dead. * master: wait for warmup before slot search in findAndGrow findEmptySlotsForOneVolume was called before the warmup wait loop, selecting slots from an incomplete topology. Move the warmup wait before slot search so volume placement uses the fully warmed-up topology with all servers registered. * master: add Retry-After header to /dir/assign warmup response The /dir/lookup handler already sets Retry-After during warmup but /dir/assign did not, leaving HTTP clients without guidance on when to retry. Add the same header using RemainingWarmupDuration(). * master: only seed warmup timestamp on leader at startup SetLastLeaderChangeTime was called unconditionally for both leader and follower nodes. Followers don't need warmup state, and the leader change event listener handles real elections. Move the seed into the IsLeader() block so only the startup leader gets warmup initialized. * master: preserve codes.Unavailable for StreamAssign warmup errors in doAssign StreamAssign returns transient warmup errors as in-band AssignResponse.Error messages. doAssign was converting these to plain fmt.Errorf, losing the codes.Unavailable classification needed for the caller's retry logic. Detect warmup error messages and wrap them as status.Error(codes.Unavailable) so RetryWithBackoff can retry. --- weed/operation/assign_file_id.go | 110 ++++++++++++------ weed/operation/assign_file_id_retry_test.go | 108 ++++++++++++++++++ weed/operation/lookup.go | 1 + weed/server/master_grpc_server_assign.go | 17 ++- weed/server/master_grpc_server_volume.go | 12 ++ weed/server/master_server.go | 7 +- weed/server/master_server_handlers.go | 27 ++++- weed/server/raft_hashicorp.go | 2 +- weed/topology/topology.go | 54 ++++++++- weed/topology/volume_growth.go | 13 ++- weed/util/retry.go | 57 ++++++++++ weed/util/retry_test.go | 81 +++++++++++++ weed/wdclient/masterclient.go | 119 ++++++++++++-------- weed/wdclient/masterclient_retry_test.go | 119 ++++++++++++++++++++ 14 files changed, 627 insertions(+), 100 deletions(-) create mode 100644 weed/operation/assign_file_id_retry_test.go create mode 100644 weed/wdclient/masterclient_retry_test.go diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index 2e429a8a1..7c2c71074 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -3,14 +3,19 @@ package operation import ( "context" "fmt" + "strings" "sync" + "time" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type VolumeAssignRequest struct { @@ -106,13 +111,21 @@ func (ap *singleThreadAssignProxy) doAssign(grpcConnection *grpc.ClientConn, pri WritableVolumeCount: request.WritableVolumeCount, } if err = ap.assignClient.Send(req); err != nil { + ap.assignClient = nil return nil, fmt.Errorf("StreamAssignSend: %w", err) } resp, grpcErr := ap.assignClient.Recv() if grpcErr != nil { + ap.assignClient = nil return nil, grpcErr } if resp.Error != "" { + // StreamAssign returns transient warmup errors as in-band responses. + // Wrap them as codes.Unavailable so the caller's retry logic can + // classify them as retriable. + if strings.Contains(resp.Error, "warming up") { + return nil, status.Errorf(codes.Unavailable, "StreamAssignRecv: %s", resp.Error) + } return nil, fmt.Errorf("StreamAssignRecv: %v", resp.Error) } @@ -149,50 +162,73 @@ func Assign(ctx context.Context, masterFn GetMasterFn, grpcDialOption grpc.DialO var lastError error ret := &AssignResult{} + // Compute a single deadline so all request entries (primary + fallback) + // share one 30s retry budget instead of each getting its own. + // Use a deadline-aware context so both RetryWithBackoff and per-attempt + // timeouts are bounded by the shared budget. + deadline := time.Now().Add(30 * time.Second) + deadlineCtx, deadlineCancel := context.WithDeadline(ctx, deadline) + defer deadlineCancel() + for i, request := range requests { if request == nil { continue } - lastError = WithMasterServerClient(false, masterFn(ctx), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { - req := &master_pb.AssignRequest{ - Count: request.Count, - Replication: request.Replication, - Collection: request.Collection, - Ttl: request.Ttl, - DiskType: request.DiskType, - DataCenter: request.DataCenter, - Rack: request.Rack, - DataNode: request.DataNode, - WritableVolumeCount: request.WritableVolumeCount, - } - resp, grpcErr := masterClient.Assign(ctx, req) - if grpcErr != nil { - return grpcErr - } - - if resp.Error != "" { - return fmt.Errorf("assignRequest: %v", resp.Error) - } + remaining := time.Until(deadline) + if remaining <= 0 { + break + } - ret.Count = resp.Count - ret.Fid = resp.Fid - ret.Url = resp.Location.Url - ret.PublicUrl = resp.Location.PublicUrl - ret.GrpcPort = int(resp.Location.GrpcPort) - ret.Error = resp.Error - ret.Auth = security.EncodedJwt(resp.Auth) - for _, r := range resp.Replicas { - ret.Replicas = append(ret.Replicas, Location{ - Url: r.Url, - PublicUrl: r.PublicUrl, - DataCenter: r.DataCenter, + lastError = util.RetryWithBackoff(deadlineCtx, "assign", remaining, + func(err error) bool { + st, ok := status.FromError(err) + return ok && st.Code() == codes.Unavailable + }, + func() error { + // Per-attempt timeout to prevent a single slow RPC from consuming the entire retry budget + attemptCtx, attemptCancel := context.WithTimeout(deadlineCtx, 10*time.Second) + defer attemptCancel() + return WithMasterServerClient(false, masterFn(attemptCtx), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + req := &master_pb.AssignRequest{ + Count: request.Count, + Replication: request.Replication, + Collection: request.Collection, + Ttl: request.Ttl, + DiskType: request.DiskType, + DataCenter: request.DataCenter, + Rack: request.Rack, + DataNode: request.DataNode, + WritableVolumeCount: request.WritableVolumeCount, + } + resp, grpcErr := masterClient.Assign(attemptCtx, req) + if grpcErr != nil { + return grpcErr + } + + if resp.Error != "" { + return fmt.Errorf("assignRequest: %v", resp.Error) + } + + ret.Count = resp.Count + ret.Fid = resp.Fid + ret.Url = resp.Location.Url + ret.PublicUrl = resp.Location.PublicUrl + ret.GrpcPort = int(resp.Location.GrpcPort) + ret.Error = resp.Error + ret.Auth = security.EncodedJwt(resp.Auth) + ret.Replicas = nil + for _, r := range resp.Replicas { + ret.Replicas = append(ret.Replicas, Location{ + Url: r.Url, + PublicUrl: r.PublicUrl, + DataCenter: r.DataCenter, + }) + } + + return nil }) - } - - return nil - - }) + }) if lastError != nil { stats.FilerHandlerCounter.WithLabelValues(stats.ErrorChunkAssign).Inc() diff --git a/weed/operation/assign_file_id_retry_test.go b/weed/operation/assign_file_id_retry_test.go new file mode 100644 index 000000000..ab9263dc0 --- /dev/null +++ b/weed/operation/assign_file_id_retry_test.go @@ -0,0 +1,108 @@ +package operation + +import ( + "context" + "errors" + "fmt" + "net" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" +) + +// fakeAssignServer returns Unavailable for the first N calls, then succeeds. +type fakeAssignServer struct { + master_pb.UnimplementedSeaweedServer + unavailableCount int32 + callCount atomic.Int32 +} + +func (s *fakeAssignServer) Assign(_ context.Context, _ *master_pb.AssignRequest) (*master_pb.AssignResponse, error) { + n := s.callCount.Add(1) + if n <= s.unavailableCount { + return nil, status.Errorf(codes.Unavailable, "master is warming up") + } + return &master_pb.AssignResponse{ + Fid: "1,abc", + Count: 1, + Location: &master_pb.Location{ + Url: "127.0.0.1:8080", + PublicUrl: "127.0.0.1:8080", + }, + }, nil +} + +func startFakeServer(t *testing.T, srv master_pb.SeaweedServer) pb.ServerAddress { + t.Helper() + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + grpcServer := grpc.NewServer() + master_pb.RegisterSeaweedServer(grpcServer, srv) + go func() { _ = grpcServer.Serve(lis) }() + t.Cleanup(grpcServer.GracefulStop) + + _, port, _ := net.SplitHostPort(lis.Addr().String()) + // Use "0." format so ToGrpcAddress() resolves to the actual port + return pb.ServerAddress(fmt.Sprintf("127.0.0.1:0.%s", port)) +} + +func TestAssignRetriesOnUnavailable(t *testing.T) { + srv := &fakeAssignServer{unavailableCount: 3} + addr := startFakeServer(t, srv) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + ret, err := Assign(ctx, func(_ context.Context) pb.ServerAddress { + return addr + }, grpc.WithTransportCredentials(insecure.NewCredentials()), &VolumeAssignRequest{Count: 1}) + + if err != nil { + t.Fatalf("expected success after retries, got error: %v", err) + } + if ret.Fid != "1,abc" { + t.Errorf("expected fid '1,abc', got '%s'", ret.Fid) + } + if calls := srv.callCount.Load(); calls != 4 { + t.Errorf("expected 4 calls (3 unavailable + 1 success), got %d", calls) + } +} + +func TestAssignStopsOnContextCancel(t *testing.T) { + srv := &fakeAssignServer{unavailableCount: 1000} // never succeeds + addr := startFakeServer(t, srv) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + start := time.Now() + _, err := Assign(ctx, func(_ context.Context) pb.ServerAddress { + return addr + }, grpc.WithTransportCredentials(insecure.NewCredentials()), &VolumeAssignRequest{Count: 1}) + + elapsed := time.Since(start) + if err == nil { + t.Fatal("expected error from context cancellation") + } + // Should stop within a reasonable time after context deadline + if elapsed > 5*time.Second { + t.Errorf("took %v, expected to stop near context deadline of 2s", elapsed) + } + // Verify the loop actually retried (not just an immediate failure) + if calls := srv.callCount.Load(); calls <= 1 { + t.Errorf("expected multiple retry attempts, got %d calls", calls) + } + // Verify the error is from context deadline + if !errors.Is(err, context.DeadlineExceeded) { + t.Errorf("expected context.DeadlineExceeded, got: %v", err) + } +} diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go index 12ad37330..aedb8f8ee 100644 --- a/weed/operation/lookup.go +++ b/weed/operation/lookup.go @@ -30,6 +30,7 @@ type LookupResult struct { Locations []Location `json:"locations,omitempty"` Jwt string `json:"jwt,omitempty"` Error string `json:"error,omitempty"` + NotFound bool `json:"-"` } func (lr *LookupResult) String() string { diff --git a/weed/server/master_grpc_server_assign.go b/weed/server/master_grpc_server_assign.go index 081d79cc1..4911c49a3 100644 --- a/weed/server/master_grpc_server_assign.go +++ b/weed/server/master_grpc_server_assign.go @@ -17,6 +17,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/topology" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func (ms *MasterServer) StreamAssign(server master_pb.Seaweed_StreamAssignServer) error { @@ -28,8 +30,15 @@ func (ms *MasterServer) StreamAssign(server master_pb.Seaweed_StreamAssignServer } resp, err := ms.Assign(context.Background(), req) if err != nil { - glog.Errorf("StreamAssign failed to assign: %v", err) - return err + // Return transient errors (e.g. warmup) as in-band error responses + // instead of killing the stream, so pooled connections survive. + if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable { + glog.V(1).Infof("StreamAssign transient error: %v", err) + resp = &master_pb.AssignResponse{Error: st.Message()} + } else { + glog.Errorf("StreamAssign failed to assign: %v", err) + return err + } } if err = server.Send(resp); err != nil { glog.Errorf("StreamAssign failed to send: %v", err) @@ -58,6 +67,10 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest if err != nil { return nil, err } + + if ms.Topo.IsWarmingUp() { + return nil, status.Errorf(codes.Unavailable, "master is warming up, topology is still loading") + } diskType := types.ToDiskType(req.DiskType) ver := needle.GetCurrentVersion() diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 3f2f8f9d4..70e3ec696 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -21,6 +21,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/types" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -163,6 +165,7 @@ func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupV resp := &master_pb.LookupVolumeResponse{} volumeLocations := ms.lookupVolumeId(req.VolumeOrFileIds, req.Collection) + notFoundCount := 0 for _, volumeOrFileId := range req.VolumeOrFileIds { vid := volumeOrFileId commaSep := strings.Index(vid, ",") @@ -183,6 +186,9 @@ func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupV if commaSep > 0 { // this is a file id auth = string(security.GenJwtForVolumeServer(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, result.VolumeOrFileId)) } + if result.NotFound { + notFoundCount++ + } resp.VolumeIdLocations = append(resp.VolumeIdLocations, &master_pb.LookupVolumeResponse_VolumeIdLocation{ VolumeOrFileId: result.VolumeOrFileId, Locations: locations, @@ -192,6 +198,12 @@ func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupV } } + // Only return Unavailable during warmup when every requested ID was a transient not-found + if len(req.VolumeOrFileIds) > 0 && notFoundCount == len(req.VolumeOrFileIds) && ms.Topo.IsLeader() && ms.Topo.IsWarmingUp() { + glog.V(0).Infof("lookup volume warming up: topology is still loading (%d not found)", notFoundCount) + return nil, status.Errorf(codes.Unavailable, "master is warming up, topology is still loading") + } + return resp, nil } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index fca4a3ade..d6dd51328 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -213,7 +213,7 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc() if ms.Topo.RaftServer.Leader() != "" { glog.V(0).Infof("[%s] %s becomes leader.", ms.Topo.RaftServer.Name(), ms.Topo.RaftServer.Leader()) - ms.Topo.LastLeaderChangeTime = time.Now() + ms.Topo.SetLastLeaderChangeTime(time.Now()) if ms.Topo.RaftServer.Leader() == ms.Topo.RaftServer.Name() { go ms.ensureTopologyId() } @@ -223,11 +223,14 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { } else if raftServer.RaftHashicorp != nil { ms.Topo.HashicorpRaft = raftServer.RaftHashicorp raftServerName = ms.Topo.HashicorpRaft.String() - ms.Topo.LastLeaderChangeTime = time.Now() } ms.Topo.RaftServerAccessLock.Unlock() if ms.Topo.IsLeader() { + // Seed the warmup timestamp so IsWarmingUp() is active even if the + // leader change event hasn't fired yet (e.g. node is already leader + // on startup). Followers don't need warmup state. + ms.Topo.SetLastLeaderChangeTime(time.Now()) glog.V(0).Infof("%s I am the leader!", raftServerName) go ms.ensureTopologyId() } else { diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index c9e0a1ba2..3bca8eadc 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -2,6 +2,7 @@ package weed_server import ( "fmt" + "math" "net/http" "strconv" "strings" @@ -53,7 +54,17 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request) location := ms.findVolumeLocation(collection, vid) httpStatus := http.StatusOK if location.Error != "" || location.Locations == nil { - httpStatus = http.StatusNotFound + if location.NotFound && ms.Topo.IsLeader() && ms.Topo.IsWarmingUp() { + httpStatus = http.StatusServiceUnavailable + remaining := ms.Topo.RemainingWarmupDuration() + if remaining < time.Second { + remaining = time.Second + } + w.Header().Set("Retry-After", fmt.Sprintf("%d", int(math.Ceil(remaining.Seconds())))) + location.Error = "service warming up, please retry" + } else { + httpStatus = http.StatusNotFound + } } else { forRead := r.FormValue("read") isRead := forRead == "yes" @@ -94,12 +105,15 @@ func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.Loo } err = getVidLocationsErr } + notFound := false if len(locations) == 0 && err == nil { err = fmt.Errorf("volume id %s not found", vid) + notFound = true } ret := operation.LookupResult{ VolumeOrFileId: vid, Locations: locations, + NotFound: notFound, } if err != nil { ret.Error = err.Error() @@ -108,6 +122,17 @@ func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.Loo } func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) { + if ms.Topo.IsLeader() && ms.Topo.IsWarmingUp() { + remaining := ms.Topo.RemainingWarmupDuration() + if remaining < time.Second { + remaining = time.Second + } + w.Header().Set("Retry-After", fmt.Sprintf("%d", int(math.Ceil(remaining.Seconds())))) + writeJsonQuiet(w, r, http.StatusServiceUnavailable, operation.AssignResult{ + Error: "master is warming up, topology is still loading", + }) + return + } stats.AssignRequest() requestedCount, e := strconv.ParseUint(r.FormValue("count"), 10, 64) if e != nil || requestedCount == 0 { diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go index 1ff213c19..99d31b1d1 100644 --- a/weed/server/raft_hashicorp.go +++ b/weed/server/raft_hashicorp.go @@ -73,6 +73,7 @@ func (s *RaftServer) monitorLeaderLoop(updatePeers bool) { select { case isLeader := <-s.RaftHashicorp.LeaderCh(): leader, _ := s.RaftHashicorp.LeaderWithID() + s.topo.SetLastLeaderChangeTime(time.Now()) if isLeader { if updatePeers { @@ -99,7 +100,6 @@ func (s *RaftServer) monitorLeaderLoop(updatePeers bool) { } glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader) prevLeader = leader - s.topo.LastLeaderChangeTime = time.Now() } } } diff --git a/weed/topology/topology.go b/weed/topology/topology.go index d16db06dd..8b659fb03 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -27,6 +27,12 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) +const ( + // WarmupPulseMultiplier is the number of heartbeat intervals to wait after + // a leader change before treating volume lookup misses as definitive. + WarmupPulseMultiplier = 3 +) + type Topology struct { vacuumLockCounter int64 NodeImpl @@ -60,7 +66,8 @@ type Topology struct { topologyId string topologyIdLock sync.RWMutex - LastLeaderChangeTime time.Time + lastLeaderChangeTime time.Time + lastLeaderChangeTimeLock sync.RWMutex } func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology { @@ -109,6 +116,51 @@ func (t *Topology) IsChildLocked() (bool, error) { return false, nil } +// SetLastLeaderChangeTime records the time of the most recent leader transition. +func (t *Topology) SetLastLeaderChangeTime(ts time.Time) { + t.lastLeaderChangeTimeLock.Lock() + defer t.lastLeaderChangeTimeLock.Unlock() + t.lastLeaderChangeTime = ts +} + +// GetLastLeaderChangeTime returns the time of the most recent leader transition. +func (t *Topology) GetLastLeaderChangeTime() time.Time { + t.lastLeaderChangeTimeLock.RLock() + defer t.lastLeaderChangeTimeLock.RUnlock() + return t.lastLeaderChangeTime +} + +// IsWarmingUp returns true if the master recently became leader and may not yet +// have a complete topology. After a leader change or restart, volume servers need +// up to WarmupPulseMultiplier heartbeat intervals to reconnect and report their volumes. +// Returns false on a fresh cluster start (MaxVolumeId == 0) since there are no +// existing volumes to wait for. +func (t *Topology) IsWarmingUp() bool { + if t.GetMaxVolumeId() == 0 { + return false + } + warmupDuration := time.Duration(t.pulse*WarmupPulseMultiplier) * time.Second + lastChange := t.GetLastLeaderChangeTime() + return !lastChange.IsZero() && time.Since(lastChange) < warmupDuration +} + +// WarmupDuration returns the configured warmup duration based on pulse interval. +func (t *Topology) WarmupDuration() time.Duration { + return time.Duration(t.pulse*WarmupPulseMultiplier) * time.Second +} + +// RemainingWarmupDuration returns how much warmup time is left, or 0 if not warming up. +func (t *Topology) RemainingWarmupDuration() time.Duration { + if !t.IsWarmingUp() { + return 0 + } + remaining := t.WarmupDuration() - time.Since(t.GetLastLeaderChangeTime()) + if remaining < 0 { + return 0 + } + return remaining +} + func (t *Topology) IsLeader() bool { t.RaftServerAccessLock.RLock() defer t.RaftServerAccessLock.RUnlock() diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index 5442ccdce..7f607d1d5 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -9,7 +9,6 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" - "github.com/seaweedfs/seaweedfs/weed/server/constants" "google.golang.org/grpc" @@ -141,6 +140,13 @@ func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targe } func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (result []*master_pb.VolumeLocation, err error) { + // Wait for warmup to complete before searching for slots so the + // topology has all volume servers registered. + for topo.IsWarmingUp() { + glog.V(0).Infof("wait for volume servers to join back") + time.Sleep(topo.WarmupDuration() / WarmupPulseMultiplier / 2) + } + servers, reservation, e := vg.findEmptySlotsForOneVolume(topo, option, true) // use reservations if e != nil { return nil, e @@ -151,11 +157,6 @@ func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topolo reservation.releaseAllReservations() } }() - - for !topo.LastLeaderChangeTime.Add(constants.VolumePulsePeriod * 2).Before(time.Now()) { - glog.V(0).Infof("wait for volume servers to join back") - time.Sleep(constants.VolumePulsePeriod / 2) - } vid, raftErr := topo.NextVolumeId() if raftErr != nil { return nil, raftErr diff --git a/weed/util/retry.go b/weed/util/retry.go index 756d30d50..66e3e5e07 100644 --- a/weed/util/retry.go +++ b/weed/util/retry.go @@ -1,6 +1,7 @@ package util import ( + "context" "strings" "time" @@ -81,6 +82,62 @@ func RetryUntil(name string, job func() error, onErrFn func(err error) (shouldCo } } +// RetryWithBackoff retries an operation on codes.Unavailable errors with exponential +// backoff, respecting context cancellation and a maximum retry duration. +// Returns nil on success, ctx.Err() on context cancellation, or the last error +// when maxDuration is exceeded or a non-retriable error occurs. +func RetryWithBackoff(ctx context.Context, name string, maxDuration time.Duration, shouldRetry func(error) bool, operation func() error) error { + waitTime := time.Second + maxWaitTime := RetryWaitTime + deadline := time.Now().Add(maxDuration) + var lastErr error + for { + if ctx.Err() != nil { + return ctx.Err() + } + if time.Until(deadline) <= 0 { + if lastErr != nil { + glog.V(0).Infof("retry %s: giving up after %v: %v", name, maxDuration, lastErr) + return lastErr + } + } + err := operation() + if err == nil { + return nil + } + lastErr = err + if !shouldRetry(err) { + return err + } + remaining := time.Until(deadline) + if remaining <= 0 { + glog.V(0).Infof("retry %s: giving up after %v: %v", name, maxDuration, err) + return err + } + sleepTime := waitTime + if sleepTime > maxWaitTime { + sleepTime = maxWaitTime + } + if sleepTime > remaining { + sleepTime = remaining + } + glog.V(1).Infof("retry %s: retrying in %v: %v", name, sleepTime, err) + timer := time.NewTimer(sleepTime) + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return ctx.Err() + case <-timer.C: + } + waitTime += waitTime / 2 + if waitTime > maxWaitTime { + waitTime = maxWaitTime + } + } +} + // Nvl return the first non-empty string func Nvl(values ...string) string { for _, s := range values { diff --git a/weed/util/retry_test.go b/weed/util/retry_test.go index 23c4132fe..928eca30f 100644 --- a/weed/util/retry_test.go +++ b/weed/util/retry_test.go @@ -1,8 +1,10 @@ package util import ( + "context" "errors" "testing" + "time" ) func TestRetryUntil(t *testing.T) { @@ -63,3 +65,82 @@ func TestRetryUntil(t *testing.T) { } }) } + +func TestRetryWithBackoff(t *testing.T) { + retryableErr := errors.New("unavailable") + shouldRetry := func(err error) bool { return err == retryableErr } + + t.Run("SucceedsAfterRetries", func(t *testing.T) { + callCount := 0 + err := RetryWithBackoff(context.Background(), "test", 30*time.Second, shouldRetry, func() error { + callCount++ + if callCount < 3 { + return retryableErr + } + return nil + }) + if err != nil { + t.Errorf("expected success, got %v", err) + } + if callCount != 3 { + t.Errorf("expected 3 calls, got %d", callCount) + } + }) + + t.Run("StopsOnNonRetryableError", func(t *testing.T) { + callCount := 0 + fatalErr := errors.New("fatal") + err := RetryWithBackoff(context.Background(), "test", 30*time.Second, shouldRetry, func() error { + callCount++ + return fatalErr + }) + if err != fatalErr { + t.Errorf("expected fatal error, got %v", err) + } + if callCount != 1 { + t.Errorf("expected 1 call, got %d", callCount) + } + }) + + t.Run("StopsOnContextCancel", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + callCount := 0 + start := time.Now() + err := RetryWithBackoff(ctx, "test", 30*time.Second, shouldRetry, func() error { + callCount++ + return retryableErr + }) + elapsed := time.Since(start) + if !errors.Is(err, context.DeadlineExceeded) { + t.Errorf("expected DeadlineExceeded, got %v", err) + } + if callCount <= 1 { + t.Errorf("expected multiple calls, got %d", callCount) + } + if elapsed > 5*time.Second { + t.Errorf("took %v, expected to stop near 2s deadline", elapsed) + } + }) + + t.Run("StopsOnMaxDuration", func(t *testing.T) { + callCount := 0 + start := time.Now() + err := RetryWithBackoff(context.Background(), "test", 3*time.Second, shouldRetry, func() error { + callCount++ + return retryableErr + }) + elapsed := time.Since(start) + if err != retryableErr { + t.Errorf("expected retryable error, got %v", err) + } + if callCount <= 1 { + t.Errorf("expected multiple calls, got %d", callCount) + } + // Should stop around 3s (maxDuration), not run forever + if elapsed > 6*time.Second { + t.Errorf("took %v, expected to stop near 3s maxDuration", elapsed) + } + }) +} diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 724931f54..f3ccaf2e2 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -44,67 +44,86 @@ func isCanceledErr(err error) bool { return false } -// LookupVolumeIds queries the master for volume locations (fallback when cache misses) -// Returns partial results with aggregated errors for volumes that failed +// LookupVolumeIds queries the master for volume locations (fallback when cache misses). +// Returns partial results with aggregated errors for volumes that failed. +// Retries on codes.Unavailable (e.g. master warming up after restart) with backoff. func (p *masterVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []string) (map[string][]Location, error) { - result := make(map[string][]Location) + var result map[string][]Location var lookupErrors []error glog.V(2).Infof("Looking up %d volumes from master: %v", len(volumeIds), volumeIds) - // Use a timeout for the master lookup to prevent indefinite blocking - timeoutCtx, cancel := context.WithTimeout(ctx, p.masterClient.grpcTimeout) - defer cancel() - - err := pb.WithMasterClient(false, p.masterClient.GetMaster(ctx), p.masterClient.grpcDialOption, false, func(client master_pb.SeaweedClient) error { - resp, err := client.LookupVolume(timeoutCtx, &master_pb.LookupVolumeRequest{ - VolumeOrFileIds: volumeIds, - }) - if err != nil { - return fmt.Errorf("master lookup failed: %v", err) - } - - for _, vidLoc := range resp.VolumeIdLocations { - // Preserve per-volume errors from master response - // These could indicate misconfiguration, volume deletion, etc. - if vidLoc.Error != "" { - lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: %s", vidLoc.VolumeOrFileId, vidLoc.Error)) - glog.V(1).Infof("volume %s lookup error from master: %s", vidLoc.VolumeOrFileId, vidLoc.Error) - continue - } + retryErr := util.RetryWithBackoff(ctx, "lookup", 30*time.Second, + func(err error) bool { + st, ok := status.FromError(err) + return ok && st.Code() == codes.Unavailable + }, + func() error { + result = make(map[string][]Location) + lookupErrors = nil + + // Per-attempt timeout bounds both master resolution and the RPC + // so a single attempt cannot consume the entire retry budget. + timeoutCtx, cancel := context.WithTimeout(ctx, p.masterClient.grpcTimeout) + defer cancel() - // Parse volume ID from response - parts := strings.Split(vidLoc.VolumeOrFileId, ",") - vidOnly := parts[0] - vid, err := strconv.ParseUint(vidOnly, 10, 32) - if err != nil { - lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: invalid volume ID format: %w", vidLoc.VolumeOrFileId, err)) - glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidLoc.VolumeOrFileId, err) - continue + master := p.masterClient.GetMaster(timeoutCtx) + if master == "" { + if ctx.Err() != nil { + return ctx.Err() + } + return status.Errorf(codes.Unavailable, "no master available") } - var locations []Location - for _, masterLoc := range vidLoc.Locations { - loc := Location{ - Url: masterLoc.Url, - PublicUrl: masterLoc.PublicUrl, - GrpcPort: int(masterLoc.GrpcPort), - DataCenter: masterLoc.DataCenter, + return pb.WithMasterClient(false, master, p.masterClient.grpcDialOption, false, func(client master_pb.SeaweedClient) error { + resp, err := client.LookupVolume(timeoutCtx, &master_pb.LookupVolumeRequest{ + VolumeOrFileIds: volumeIds, + }) + if err != nil { + return err } - // Update cache with the location - p.masterClient.addLocation(uint32(vid), loc) - locations = append(locations, loc) - } - if len(locations) > 0 { - result[vidOnly] = locations - } - } - return nil - }) + for _, vidLoc := range resp.VolumeIdLocations { + // Preserve per-volume errors from master response + // These could indicate misconfiguration, volume deletion, etc. + if vidLoc.Error != "" { + lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: %s", vidLoc.VolumeOrFileId, vidLoc.Error)) + glog.V(1).Infof("volume %s lookup error from master: %s", vidLoc.VolumeOrFileId, vidLoc.Error) + continue + } + + // Parse volume ID from response + parts := strings.Split(vidLoc.VolumeOrFileId, ",") + vidOnly := parts[0] + vid, err := strconv.ParseUint(vidOnly, 10, 32) + if err != nil { + lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: invalid volume ID format: %w", vidLoc.VolumeOrFileId, err)) + glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidLoc.VolumeOrFileId, err) + continue + } - if err != nil { - return nil, err + var locations []Location + for _, masterLoc := range vidLoc.Locations { + loc := Location{ + Url: masterLoc.Url, + PublicUrl: masterLoc.PublicUrl, + GrpcPort: int(masterLoc.GrpcPort), + DataCenter: masterLoc.DataCenter, + } + // Update cache with the location + p.masterClient.addLocation(uint32(vid), loc) + locations = append(locations, loc) + } + + if len(locations) > 0 { + result[vidOnly] = locations + } + } + return nil + }) + }) + if retryErr != nil { + return nil, retryErr } // Return partial results with detailed errors diff --git a/weed/wdclient/masterclient_retry_test.go b/weed/wdclient/masterclient_retry_test.go new file mode 100644 index 000000000..1b24b78c7 --- /dev/null +++ b/weed/wdclient/masterclient_retry_test.go @@ -0,0 +1,119 @@ +package wdclient + +import ( + "context" + "errors" + "fmt" + "net" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" +) + +// fakeLookupServer returns Unavailable for the first N calls, then succeeds. +type fakeLookupServer struct { + master_pb.UnimplementedSeaweedServer + unavailableCount int32 + callCount atomic.Int32 +} + +func (s *fakeLookupServer) LookupVolume(_ context.Context, req *master_pb.LookupVolumeRequest) (*master_pb.LookupVolumeResponse, error) { + n := s.callCount.Add(1) + if n <= s.unavailableCount { + return nil, status.Errorf(codes.Unavailable, "master is warming up") + } + resp := &master_pb.LookupVolumeResponse{} + for _, vid := range req.VolumeOrFileIds { + resp.VolumeIdLocations = append(resp.VolumeIdLocations, &master_pb.LookupVolumeResponse_VolumeIdLocation{ + VolumeOrFileId: vid, + Locations: []*master_pb.Location{ + {Url: "127.0.0.1:8080", PublicUrl: "127.0.0.1:8080"}, + }, + }) + } + return resp, nil +} + +func startFakeMasterServer(t *testing.T, srv master_pb.SeaweedServer) pb.ServerAddress { + t.Helper() + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + grpcServer := grpc.NewServer() + master_pb.RegisterSeaweedServer(grpcServer, srv) + go func() { _ = grpcServer.Serve(lis) }() + t.Cleanup(grpcServer.GracefulStop) + + _, port, _ := net.SplitHostPort(lis.Addr().String()) + return pb.ServerAddress(fmt.Sprintf("127.0.0.1:0.%s", port)) +} + +func TestLookupVolumeIdsRetriesOnUnavailable(t *testing.T) { + srv := &fakeLookupServer{unavailableCount: 3} + addr := startFakeMasterServer(t, srv) + + mc := NewMasterClient( + grpc.WithTransportCredentials(insecure.NewCredentials()), + "", "test", "", "", "", + pb.ServerDiscovery{}, + ) + mc.setCurrentMaster(addr) + mc.grpcTimeout = 5 * time.Second + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + provider := &masterVolumeProvider{masterClient: mc} + result, err := provider.LookupVolumeIds(ctx, []string{"1"}) + + if err != nil { + t.Fatalf("expected success after retries, got error: %v", err) + } + if _, ok := result["1"]; !ok { + t.Error("expected volume 1 in result") + } + if calls := srv.callCount.Load(); calls != 4 { + t.Errorf("expected 4 calls (3 unavailable + 1 success), got %d", calls) + } +} + +func TestLookupVolumeIdsStopsOnContextCancel(t *testing.T) { + srv := &fakeLookupServer{unavailableCount: 1000} + addr := startFakeMasterServer(t, srv) + + mc := NewMasterClient( + grpc.WithTransportCredentials(insecure.NewCredentials()), + "", "test", "", "", "", + pb.ServerDiscovery{}, + ) + mc.setCurrentMaster(addr) + mc.grpcTimeout = 5 * time.Second + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + provider := &masterVolumeProvider{masterClient: mc} + start := time.Now() + _, err := provider.LookupVolumeIds(ctx, []string{"1"}) + elapsed := time.Since(start) + + // Verify the error is from context deadline + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("expected context.DeadlineExceeded, got: %v", err) + } + // Verify the loop actually retried (not just an immediate failure) + if calls := srv.callCount.Load(); calls <= 1 { + t.Errorf("expected multiple retry attempts, got %d calls", calls) + } + if elapsed > 5*time.Second { + t.Errorf("took %v, expected to stop near context deadline of 2s", elapsed) + } +}