diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index 2e429a8a1..7c2c71074 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -3,14 +3,19 @@ package operation import ( "context" "fmt" + "strings" "sync" + "time" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type VolumeAssignRequest struct { @@ -106,13 +111,21 @@ func (ap *singleThreadAssignProxy) doAssign(grpcConnection *grpc.ClientConn, pri WritableVolumeCount: request.WritableVolumeCount, } if err = ap.assignClient.Send(req); err != nil { + ap.assignClient = nil return nil, fmt.Errorf("StreamAssignSend: %w", err) } resp, grpcErr := ap.assignClient.Recv() if grpcErr != nil { + ap.assignClient = nil return nil, grpcErr } if resp.Error != "" { + // StreamAssign returns transient warmup errors as in-band responses. + // Wrap them as codes.Unavailable so the caller's retry logic can + // classify them as retriable. + if strings.Contains(resp.Error, "warming up") { + return nil, status.Errorf(codes.Unavailable, "StreamAssignRecv: %s", resp.Error) + } return nil, fmt.Errorf("StreamAssignRecv: %v", resp.Error) } @@ -149,50 +162,73 @@ func Assign(ctx context.Context, masterFn GetMasterFn, grpcDialOption grpc.DialO var lastError error ret := &AssignResult{} + // Compute a single deadline so all request entries (primary + fallback) + // share one 30s retry budget instead of each getting its own. + // Use a deadline-aware context so both RetryWithBackoff and per-attempt + // timeouts are bounded by the shared budget. + deadline := time.Now().Add(30 * time.Second) + deadlineCtx, deadlineCancel := context.WithDeadline(ctx, deadline) + defer deadlineCancel() + for i, request := range requests { if request == nil { continue } - lastError = WithMasterServerClient(false, masterFn(ctx), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { - req := &master_pb.AssignRequest{ - Count: request.Count, - Replication: request.Replication, - Collection: request.Collection, - Ttl: request.Ttl, - DiskType: request.DiskType, - DataCenter: request.DataCenter, - Rack: request.Rack, - DataNode: request.DataNode, - WritableVolumeCount: request.WritableVolumeCount, - } - resp, grpcErr := masterClient.Assign(ctx, req) - if grpcErr != nil { - return grpcErr - } - - if resp.Error != "" { - return fmt.Errorf("assignRequest: %v", resp.Error) - } + remaining := time.Until(deadline) + if remaining <= 0 { + break + } - ret.Count = resp.Count - ret.Fid = resp.Fid - ret.Url = resp.Location.Url - ret.PublicUrl = resp.Location.PublicUrl - ret.GrpcPort = int(resp.Location.GrpcPort) - ret.Error = resp.Error - ret.Auth = security.EncodedJwt(resp.Auth) - for _, r := range resp.Replicas { - ret.Replicas = append(ret.Replicas, Location{ - Url: r.Url, - PublicUrl: r.PublicUrl, - DataCenter: r.DataCenter, + lastError = util.RetryWithBackoff(deadlineCtx, "assign", remaining, + func(err error) bool { + st, ok := status.FromError(err) + return ok && st.Code() == codes.Unavailable + }, + func() error { + // Per-attempt timeout to prevent a single slow RPC from consuming the entire retry budget + attemptCtx, attemptCancel := context.WithTimeout(deadlineCtx, 10*time.Second) + defer attemptCancel() + return WithMasterServerClient(false, masterFn(attemptCtx), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + req := &master_pb.AssignRequest{ + Count: request.Count, + Replication: request.Replication, + Collection: request.Collection, + Ttl: request.Ttl, + DiskType: request.DiskType, + DataCenter: request.DataCenter, + Rack: request.Rack, + DataNode: request.DataNode, + WritableVolumeCount: request.WritableVolumeCount, + } + resp, grpcErr := masterClient.Assign(attemptCtx, req) + if grpcErr != nil { + return grpcErr + } + + if resp.Error != "" { + return fmt.Errorf("assignRequest: %v", resp.Error) + } + + ret.Count = resp.Count + ret.Fid = resp.Fid + ret.Url = resp.Location.Url + ret.PublicUrl = resp.Location.PublicUrl + ret.GrpcPort = int(resp.Location.GrpcPort) + ret.Error = resp.Error + ret.Auth = security.EncodedJwt(resp.Auth) + ret.Replicas = nil + for _, r := range resp.Replicas { + ret.Replicas = append(ret.Replicas, Location{ + Url: r.Url, + PublicUrl: r.PublicUrl, + DataCenter: r.DataCenter, + }) + } + + return nil }) - } - - return nil - - }) + }) if lastError != nil { stats.FilerHandlerCounter.WithLabelValues(stats.ErrorChunkAssign).Inc() diff --git a/weed/operation/assign_file_id_retry_test.go b/weed/operation/assign_file_id_retry_test.go new file mode 100644 index 000000000..ab9263dc0 --- /dev/null +++ b/weed/operation/assign_file_id_retry_test.go @@ -0,0 +1,108 @@ +package operation + +import ( + "context" + "errors" + "fmt" + "net" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" +) + +// fakeAssignServer returns Unavailable for the first N calls, then succeeds. +type fakeAssignServer struct { + master_pb.UnimplementedSeaweedServer + unavailableCount int32 + callCount atomic.Int32 +} + +func (s *fakeAssignServer) Assign(_ context.Context, _ *master_pb.AssignRequest) (*master_pb.AssignResponse, error) { + n := s.callCount.Add(1) + if n <= s.unavailableCount { + return nil, status.Errorf(codes.Unavailable, "master is warming up") + } + return &master_pb.AssignResponse{ + Fid: "1,abc", + Count: 1, + Location: &master_pb.Location{ + Url: "127.0.0.1:8080", + PublicUrl: "127.0.0.1:8080", + }, + }, nil +} + +func startFakeServer(t *testing.T, srv master_pb.SeaweedServer) pb.ServerAddress { + t.Helper() + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + grpcServer := grpc.NewServer() + master_pb.RegisterSeaweedServer(grpcServer, srv) + go func() { _ = grpcServer.Serve(lis) }() + t.Cleanup(grpcServer.GracefulStop) + + _, port, _ := net.SplitHostPort(lis.Addr().String()) + // Use "0." format so ToGrpcAddress() resolves to the actual port + return pb.ServerAddress(fmt.Sprintf("127.0.0.1:0.%s", port)) +} + +func TestAssignRetriesOnUnavailable(t *testing.T) { + srv := &fakeAssignServer{unavailableCount: 3} + addr := startFakeServer(t, srv) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + ret, err := Assign(ctx, func(_ context.Context) pb.ServerAddress { + return addr + }, grpc.WithTransportCredentials(insecure.NewCredentials()), &VolumeAssignRequest{Count: 1}) + + if err != nil { + t.Fatalf("expected success after retries, got error: %v", err) + } + if ret.Fid != "1,abc" { + t.Errorf("expected fid '1,abc', got '%s'", ret.Fid) + } + if calls := srv.callCount.Load(); calls != 4 { + t.Errorf("expected 4 calls (3 unavailable + 1 success), got %d", calls) + } +} + +func TestAssignStopsOnContextCancel(t *testing.T) { + srv := &fakeAssignServer{unavailableCount: 1000} // never succeeds + addr := startFakeServer(t, srv) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + start := time.Now() + _, err := Assign(ctx, func(_ context.Context) pb.ServerAddress { + return addr + }, grpc.WithTransportCredentials(insecure.NewCredentials()), &VolumeAssignRequest{Count: 1}) + + elapsed := time.Since(start) + if err == nil { + t.Fatal("expected error from context cancellation") + } + // Should stop within a reasonable time after context deadline + if elapsed > 5*time.Second { + t.Errorf("took %v, expected to stop near context deadline of 2s", elapsed) + } + // Verify the loop actually retried (not just an immediate failure) + if calls := srv.callCount.Load(); calls <= 1 { + t.Errorf("expected multiple retry attempts, got %d calls", calls) + } + // Verify the error is from context deadline + if !errors.Is(err, context.DeadlineExceeded) { + t.Errorf("expected context.DeadlineExceeded, got: %v", err) + } +} diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go index 12ad37330..aedb8f8ee 100644 --- a/weed/operation/lookup.go +++ b/weed/operation/lookup.go @@ -30,6 +30,7 @@ type LookupResult struct { Locations []Location `json:"locations,omitempty"` Jwt string `json:"jwt,omitempty"` Error string `json:"error,omitempty"` + NotFound bool `json:"-"` } func (lr *LookupResult) String() string { diff --git a/weed/server/master_grpc_server_assign.go b/weed/server/master_grpc_server_assign.go index 081d79cc1..4911c49a3 100644 --- a/weed/server/master_grpc_server_assign.go +++ b/weed/server/master_grpc_server_assign.go @@ -17,6 +17,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/topology" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func (ms *MasterServer) StreamAssign(server master_pb.Seaweed_StreamAssignServer) error { @@ -28,8 +30,15 @@ func (ms *MasterServer) StreamAssign(server master_pb.Seaweed_StreamAssignServer } resp, err := ms.Assign(context.Background(), req) if err != nil { - glog.Errorf("StreamAssign failed to assign: %v", err) - return err + // Return transient errors (e.g. warmup) as in-band error responses + // instead of killing the stream, so pooled connections survive. + if st, ok := status.FromError(err); ok && st.Code() == codes.Unavailable { + glog.V(1).Infof("StreamAssign transient error: %v", err) + resp = &master_pb.AssignResponse{Error: st.Message()} + } else { + glog.Errorf("StreamAssign failed to assign: %v", err) + return err + } } if err = server.Send(resp); err != nil { glog.Errorf("StreamAssign failed to send: %v", err) @@ -58,6 +67,10 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest if err != nil { return nil, err } + + if ms.Topo.IsWarmingUp() { + return nil, status.Errorf(codes.Unavailable, "master is warming up, topology is still loading") + } diskType := types.ToDiskType(req.DiskType) ver := needle.GetCurrentVersion() diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 3f2f8f9d4..70e3ec696 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -21,6 +21,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/types" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -163,6 +165,7 @@ func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupV resp := &master_pb.LookupVolumeResponse{} volumeLocations := ms.lookupVolumeId(req.VolumeOrFileIds, req.Collection) + notFoundCount := 0 for _, volumeOrFileId := range req.VolumeOrFileIds { vid := volumeOrFileId commaSep := strings.Index(vid, ",") @@ -183,6 +186,9 @@ func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupV if commaSep > 0 { // this is a file id auth = string(security.GenJwtForVolumeServer(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, result.VolumeOrFileId)) } + if result.NotFound { + notFoundCount++ + } resp.VolumeIdLocations = append(resp.VolumeIdLocations, &master_pb.LookupVolumeResponse_VolumeIdLocation{ VolumeOrFileId: result.VolumeOrFileId, Locations: locations, @@ -192,6 +198,12 @@ func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupV } } + // Only return Unavailable during warmup when every requested ID was a transient not-found + if len(req.VolumeOrFileIds) > 0 && notFoundCount == len(req.VolumeOrFileIds) && ms.Topo.IsLeader() && ms.Topo.IsWarmingUp() { + glog.V(0).Infof("lookup volume warming up: topology is still loading (%d not found)", notFoundCount) + return nil, status.Errorf(codes.Unavailable, "master is warming up, topology is still loading") + } + return resp, nil } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index fca4a3ade..d6dd51328 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -213,7 +213,7 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc() if ms.Topo.RaftServer.Leader() != "" { glog.V(0).Infof("[%s] %s becomes leader.", ms.Topo.RaftServer.Name(), ms.Topo.RaftServer.Leader()) - ms.Topo.LastLeaderChangeTime = time.Now() + ms.Topo.SetLastLeaderChangeTime(time.Now()) if ms.Topo.RaftServer.Leader() == ms.Topo.RaftServer.Name() { go ms.ensureTopologyId() } @@ -223,11 +223,14 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { } else if raftServer.RaftHashicorp != nil { ms.Topo.HashicorpRaft = raftServer.RaftHashicorp raftServerName = ms.Topo.HashicorpRaft.String() - ms.Topo.LastLeaderChangeTime = time.Now() } ms.Topo.RaftServerAccessLock.Unlock() if ms.Topo.IsLeader() { + // Seed the warmup timestamp so IsWarmingUp() is active even if the + // leader change event hasn't fired yet (e.g. node is already leader + // on startup). Followers don't need warmup state. + ms.Topo.SetLastLeaderChangeTime(time.Now()) glog.V(0).Infof("%s I am the leader!", raftServerName) go ms.ensureTopologyId() } else { diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index c9e0a1ba2..3bca8eadc 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -2,6 +2,7 @@ package weed_server import ( "fmt" + "math" "net/http" "strconv" "strings" @@ -53,7 +54,17 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request) location := ms.findVolumeLocation(collection, vid) httpStatus := http.StatusOK if location.Error != "" || location.Locations == nil { - httpStatus = http.StatusNotFound + if location.NotFound && ms.Topo.IsLeader() && ms.Topo.IsWarmingUp() { + httpStatus = http.StatusServiceUnavailable + remaining := ms.Topo.RemainingWarmupDuration() + if remaining < time.Second { + remaining = time.Second + } + w.Header().Set("Retry-After", fmt.Sprintf("%d", int(math.Ceil(remaining.Seconds())))) + location.Error = "service warming up, please retry" + } else { + httpStatus = http.StatusNotFound + } } else { forRead := r.FormValue("read") isRead := forRead == "yes" @@ -94,12 +105,15 @@ func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.Loo } err = getVidLocationsErr } + notFound := false if len(locations) == 0 && err == nil { err = fmt.Errorf("volume id %s not found", vid) + notFound = true } ret := operation.LookupResult{ VolumeOrFileId: vid, Locations: locations, + NotFound: notFound, } if err != nil { ret.Error = err.Error() @@ -108,6 +122,17 @@ func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.Loo } func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) { + if ms.Topo.IsLeader() && ms.Topo.IsWarmingUp() { + remaining := ms.Topo.RemainingWarmupDuration() + if remaining < time.Second { + remaining = time.Second + } + w.Header().Set("Retry-After", fmt.Sprintf("%d", int(math.Ceil(remaining.Seconds())))) + writeJsonQuiet(w, r, http.StatusServiceUnavailable, operation.AssignResult{ + Error: "master is warming up, topology is still loading", + }) + return + } stats.AssignRequest() requestedCount, e := strconv.ParseUint(r.FormValue("count"), 10, 64) if e != nil || requestedCount == 0 { diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go index 1ff213c19..99d31b1d1 100644 --- a/weed/server/raft_hashicorp.go +++ b/weed/server/raft_hashicorp.go @@ -73,6 +73,7 @@ func (s *RaftServer) monitorLeaderLoop(updatePeers bool) { select { case isLeader := <-s.RaftHashicorp.LeaderCh(): leader, _ := s.RaftHashicorp.LeaderWithID() + s.topo.SetLastLeaderChangeTime(time.Now()) if isLeader { if updatePeers { @@ -99,7 +100,6 @@ func (s *RaftServer) monitorLeaderLoop(updatePeers bool) { } glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader) prevLeader = leader - s.topo.LastLeaderChangeTime = time.Now() } } } diff --git a/weed/topology/topology.go b/weed/topology/topology.go index d16db06dd..8b659fb03 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -27,6 +27,12 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) +const ( + // WarmupPulseMultiplier is the number of heartbeat intervals to wait after + // a leader change before treating volume lookup misses as definitive. + WarmupPulseMultiplier = 3 +) + type Topology struct { vacuumLockCounter int64 NodeImpl @@ -60,7 +66,8 @@ type Topology struct { topologyId string topologyIdLock sync.RWMutex - LastLeaderChangeTime time.Time + lastLeaderChangeTime time.Time + lastLeaderChangeTimeLock sync.RWMutex } func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology { @@ -109,6 +116,51 @@ func (t *Topology) IsChildLocked() (bool, error) { return false, nil } +// SetLastLeaderChangeTime records the time of the most recent leader transition. +func (t *Topology) SetLastLeaderChangeTime(ts time.Time) { + t.lastLeaderChangeTimeLock.Lock() + defer t.lastLeaderChangeTimeLock.Unlock() + t.lastLeaderChangeTime = ts +} + +// GetLastLeaderChangeTime returns the time of the most recent leader transition. +func (t *Topology) GetLastLeaderChangeTime() time.Time { + t.lastLeaderChangeTimeLock.RLock() + defer t.lastLeaderChangeTimeLock.RUnlock() + return t.lastLeaderChangeTime +} + +// IsWarmingUp returns true if the master recently became leader and may not yet +// have a complete topology. After a leader change or restart, volume servers need +// up to WarmupPulseMultiplier heartbeat intervals to reconnect and report their volumes. +// Returns false on a fresh cluster start (MaxVolumeId == 0) since there are no +// existing volumes to wait for. +func (t *Topology) IsWarmingUp() bool { + if t.GetMaxVolumeId() == 0 { + return false + } + warmupDuration := time.Duration(t.pulse*WarmupPulseMultiplier) * time.Second + lastChange := t.GetLastLeaderChangeTime() + return !lastChange.IsZero() && time.Since(lastChange) < warmupDuration +} + +// WarmupDuration returns the configured warmup duration based on pulse interval. +func (t *Topology) WarmupDuration() time.Duration { + return time.Duration(t.pulse*WarmupPulseMultiplier) * time.Second +} + +// RemainingWarmupDuration returns how much warmup time is left, or 0 if not warming up. +func (t *Topology) RemainingWarmupDuration() time.Duration { + if !t.IsWarmingUp() { + return 0 + } + remaining := t.WarmupDuration() - time.Since(t.GetLastLeaderChangeTime()) + if remaining < 0 { + return 0 + } + return remaining +} + func (t *Topology) IsLeader() bool { t.RaftServerAccessLock.RLock() defer t.RaftServerAccessLock.RUnlock() diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index 5442ccdce..7f607d1d5 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -9,7 +9,6 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" - "github.com/seaweedfs/seaweedfs/weed/server/constants" "google.golang.org/grpc" @@ -141,6 +140,13 @@ func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targe } func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (result []*master_pb.VolumeLocation, err error) { + // Wait for warmup to complete before searching for slots so the + // topology has all volume servers registered. + for topo.IsWarmingUp() { + glog.V(0).Infof("wait for volume servers to join back") + time.Sleep(topo.WarmupDuration() / WarmupPulseMultiplier / 2) + } + servers, reservation, e := vg.findEmptySlotsForOneVolume(topo, option, true) // use reservations if e != nil { return nil, e @@ -151,11 +157,6 @@ func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topolo reservation.releaseAllReservations() } }() - - for !topo.LastLeaderChangeTime.Add(constants.VolumePulsePeriod * 2).Before(time.Now()) { - glog.V(0).Infof("wait for volume servers to join back") - time.Sleep(constants.VolumePulsePeriod / 2) - } vid, raftErr := topo.NextVolumeId() if raftErr != nil { return nil, raftErr diff --git a/weed/util/retry.go b/weed/util/retry.go index 756d30d50..66e3e5e07 100644 --- a/weed/util/retry.go +++ b/weed/util/retry.go @@ -1,6 +1,7 @@ package util import ( + "context" "strings" "time" @@ -81,6 +82,62 @@ func RetryUntil(name string, job func() error, onErrFn func(err error) (shouldCo } } +// RetryWithBackoff retries an operation on codes.Unavailable errors with exponential +// backoff, respecting context cancellation and a maximum retry duration. +// Returns nil on success, ctx.Err() on context cancellation, or the last error +// when maxDuration is exceeded or a non-retriable error occurs. +func RetryWithBackoff(ctx context.Context, name string, maxDuration time.Duration, shouldRetry func(error) bool, operation func() error) error { + waitTime := time.Second + maxWaitTime := RetryWaitTime + deadline := time.Now().Add(maxDuration) + var lastErr error + for { + if ctx.Err() != nil { + return ctx.Err() + } + if time.Until(deadline) <= 0 { + if lastErr != nil { + glog.V(0).Infof("retry %s: giving up after %v: %v", name, maxDuration, lastErr) + return lastErr + } + } + err := operation() + if err == nil { + return nil + } + lastErr = err + if !shouldRetry(err) { + return err + } + remaining := time.Until(deadline) + if remaining <= 0 { + glog.V(0).Infof("retry %s: giving up after %v: %v", name, maxDuration, err) + return err + } + sleepTime := waitTime + if sleepTime > maxWaitTime { + sleepTime = maxWaitTime + } + if sleepTime > remaining { + sleepTime = remaining + } + glog.V(1).Infof("retry %s: retrying in %v: %v", name, sleepTime, err) + timer := time.NewTimer(sleepTime) + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return ctx.Err() + case <-timer.C: + } + waitTime += waitTime / 2 + if waitTime > maxWaitTime { + waitTime = maxWaitTime + } + } +} + // Nvl return the first non-empty string func Nvl(values ...string) string { for _, s := range values { diff --git a/weed/util/retry_test.go b/weed/util/retry_test.go index 23c4132fe..928eca30f 100644 --- a/weed/util/retry_test.go +++ b/weed/util/retry_test.go @@ -1,8 +1,10 @@ package util import ( + "context" "errors" "testing" + "time" ) func TestRetryUntil(t *testing.T) { @@ -63,3 +65,82 @@ func TestRetryUntil(t *testing.T) { } }) } + +func TestRetryWithBackoff(t *testing.T) { + retryableErr := errors.New("unavailable") + shouldRetry := func(err error) bool { return err == retryableErr } + + t.Run("SucceedsAfterRetries", func(t *testing.T) { + callCount := 0 + err := RetryWithBackoff(context.Background(), "test", 30*time.Second, shouldRetry, func() error { + callCount++ + if callCount < 3 { + return retryableErr + } + return nil + }) + if err != nil { + t.Errorf("expected success, got %v", err) + } + if callCount != 3 { + t.Errorf("expected 3 calls, got %d", callCount) + } + }) + + t.Run("StopsOnNonRetryableError", func(t *testing.T) { + callCount := 0 + fatalErr := errors.New("fatal") + err := RetryWithBackoff(context.Background(), "test", 30*time.Second, shouldRetry, func() error { + callCount++ + return fatalErr + }) + if err != fatalErr { + t.Errorf("expected fatal error, got %v", err) + } + if callCount != 1 { + t.Errorf("expected 1 call, got %d", callCount) + } + }) + + t.Run("StopsOnContextCancel", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + callCount := 0 + start := time.Now() + err := RetryWithBackoff(ctx, "test", 30*time.Second, shouldRetry, func() error { + callCount++ + return retryableErr + }) + elapsed := time.Since(start) + if !errors.Is(err, context.DeadlineExceeded) { + t.Errorf("expected DeadlineExceeded, got %v", err) + } + if callCount <= 1 { + t.Errorf("expected multiple calls, got %d", callCount) + } + if elapsed > 5*time.Second { + t.Errorf("took %v, expected to stop near 2s deadline", elapsed) + } + }) + + t.Run("StopsOnMaxDuration", func(t *testing.T) { + callCount := 0 + start := time.Now() + err := RetryWithBackoff(context.Background(), "test", 3*time.Second, shouldRetry, func() error { + callCount++ + return retryableErr + }) + elapsed := time.Since(start) + if err != retryableErr { + t.Errorf("expected retryable error, got %v", err) + } + if callCount <= 1 { + t.Errorf("expected multiple calls, got %d", callCount) + } + // Should stop around 3s (maxDuration), not run forever + if elapsed > 6*time.Second { + t.Errorf("took %v, expected to stop near 3s maxDuration", elapsed) + } + }) +} diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 724931f54..f3ccaf2e2 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -44,67 +44,86 @@ func isCanceledErr(err error) bool { return false } -// LookupVolumeIds queries the master for volume locations (fallback when cache misses) -// Returns partial results with aggregated errors for volumes that failed +// LookupVolumeIds queries the master for volume locations (fallback when cache misses). +// Returns partial results with aggregated errors for volumes that failed. +// Retries on codes.Unavailable (e.g. master warming up after restart) with backoff. func (p *masterVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []string) (map[string][]Location, error) { - result := make(map[string][]Location) + var result map[string][]Location var lookupErrors []error glog.V(2).Infof("Looking up %d volumes from master: %v", len(volumeIds), volumeIds) - // Use a timeout for the master lookup to prevent indefinite blocking - timeoutCtx, cancel := context.WithTimeout(ctx, p.masterClient.grpcTimeout) - defer cancel() - - err := pb.WithMasterClient(false, p.masterClient.GetMaster(ctx), p.masterClient.grpcDialOption, false, func(client master_pb.SeaweedClient) error { - resp, err := client.LookupVolume(timeoutCtx, &master_pb.LookupVolumeRequest{ - VolumeOrFileIds: volumeIds, - }) - if err != nil { - return fmt.Errorf("master lookup failed: %v", err) - } - - for _, vidLoc := range resp.VolumeIdLocations { - // Preserve per-volume errors from master response - // These could indicate misconfiguration, volume deletion, etc. - if vidLoc.Error != "" { - lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: %s", vidLoc.VolumeOrFileId, vidLoc.Error)) - glog.V(1).Infof("volume %s lookup error from master: %s", vidLoc.VolumeOrFileId, vidLoc.Error) - continue - } + retryErr := util.RetryWithBackoff(ctx, "lookup", 30*time.Second, + func(err error) bool { + st, ok := status.FromError(err) + return ok && st.Code() == codes.Unavailable + }, + func() error { + result = make(map[string][]Location) + lookupErrors = nil + + // Per-attempt timeout bounds both master resolution and the RPC + // so a single attempt cannot consume the entire retry budget. + timeoutCtx, cancel := context.WithTimeout(ctx, p.masterClient.grpcTimeout) + defer cancel() - // Parse volume ID from response - parts := strings.Split(vidLoc.VolumeOrFileId, ",") - vidOnly := parts[0] - vid, err := strconv.ParseUint(vidOnly, 10, 32) - if err != nil { - lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: invalid volume ID format: %w", vidLoc.VolumeOrFileId, err)) - glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidLoc.VolumeOrFileId, err) - continue + master := p.masterClient.GetMaster(timeoutCtx) + if master == "" { + if ctx.Err() != nil { + return ctx.Err() + } + return status.Errorf(codes.Unavailable, "no master available") } - var locations []Location - for _, masterLoc := range vidLoc.Locations { - loc := Location{ - Url: masterLoc.Url, - PublicUrl: masterLoc.PublicUrl, - GrpcPort: int(masterLoc.GrpcPort), - DataCenter: masterLoc.DataCenter, + return pb.WithMasterClient(false, master, p.masterClient.grpcDialOption, false, func(client master_pb.SeaweedClient) error { + resp, err := client.LookupVolume(timeoutCtx, &master_pb.LookupVolumeRequest{ + VolumeOrFileIds: volumeIds, + }) + if err != nil { + return err } - // Update cache with the location - p.masterClient.addLocation(uint32(vid), loc) - locations = append(locations, loc) - } - if len(locations) > 0 { - result[vidOnly] = locations - } - } - return nil - }) + for _, vidLoc := range resp.VolumeIdLocations { + // Preserve per-volume errors from master response + // These could indicate misconfiguration, volume deletion, etc. + if vidLoc.Error != "" { + lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: %s", vidLoc.VolumeOrFileId, vidLoc.Error)) + glog.V(1).Infof("volume %s lookup error from master: %s", vidLoc.VolumeOrFileId, vidLoc.Error) + continue + } + + // Parse volume ID from response + parts := strings.Split(vidLoc.VolumeOrFileId, ",") + vidOnly := parts[0] + vid, err := strconv.ParseUint(vidOnly, 10, 32) + if err != nil { + lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: invalid volume ID format: %w", vidLoc.VolumeOrFileId, err)) + glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidLoc.VolumeOrFileId, err) + continue + } - if err != nil { - return nil, err + var locations []Location + for _, masterLoc := range vidLoc.Locations { + loc := Location{ + Url: masterLoc.Url, + PublicUrl: masterLoc.PublicUrl, + GrpcPort: int(masterLoc.GrpcPort), + DataCenter: masterLoc.DataCenter, + } + // Update cache with the location + p.masterClient.addLocation(uint32(vid), loc) + locations = append(locations, loc) + } + + if len(locations) > 0 { + result[vidOnly] = locations + } + } + return nil + }) + }) + if retryErr != nil { + return nil, retryErr } // Return partial results with detailed errors diff --git a/weed/wdclient/masterclient_retry_test.go b/weed/wdclient/masterclient_retry_test.go new file mode 100644 index 000000000..1b24b78c7 --- /dev/null +++ b/weed/wdclient/masterclient_retry_test.go @@ -0,0 +1,119 @@ +package wdclient + +import ( + "context" + "errors" + "fmt" + "net" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" +) + +// fakeLookupServer returns Unavailable for the first N calls, then succeeds. +type fakeLookupServer struct { + master_pb.UnimplementedSeaweedServer + unavailableCount int32 + callCount atomic.Int32 +} + +func (s *fakeLookupServer) LookupVolume(_ context.Context, req *master_pb.LookupVolumeRequest) (*master_pb.LookupVolumeResponse, error) { + n := s.callCount.Add(1) + if n <= s.unavailableCount { + return nil, status.Errorf(codes.Unavailable, "master is warming up") + } + resp := &master_pb.LookupVolumeResponse{} + for _, vid := range req.VolumeOrFileIds { + resp.VolumeIdLocations = append(resp.VolumeIdLocations, &master_pb.LookupVolumeResponse_VolumeIdLocation{ + VolumeOrFileId: vid, + Locations: []*master_pb.Location{ + {Url: "127.0.0.1:8080", PublicUrl: "127.0.0.1:8080"}, + }, + }) + } + return resp, nil +} + +func startFakeMasterServer(t *testing.T, srv master_pb.SeaweedServer) pb.ServerAddress { + t.Helper() + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + grpcServer := grpc.NewServer() + master_pb.RegisterSeaweedServer(grpcServer, srv) + go func() { _ = grpcServer.Serve(lis) }() + t.Cleanup(grpcServer.GracefulStop) + + _, port, _ := net.SplitHostPort(lis.Addr().String()) + return pb.ServerAddress(fmt.Sprintf("127.0.0.1:0.%s", port)) +} + +func TestLookupVolumeIdsRetriesOnUnavailable(t *testing.T) { + srv := &fakeLookupServer{unavailableCount: 3} + addr := startFakeMasterServer(t, srv) + + mc := NewMasterClient( + grpc.WithTransportCredentials(insecure.NewCredentials()), + "", "test", "", "", "", + pb.ServerDiscovery{}, + ) + mc.setCurrentMaster(addr) + mc.grpcTimeout = 5 * time.Second + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + provider := &masterVolumeProvider{masterClient: mc} + result, err := provider.LookupVolumeIds(ctx, []string{"1"}) + + if err != nil { + t.Fatalf("expected success after retries, got error: %v", err) + } + if _, ok := result["1"]; !ok { + t.Error("expected volume 1 in result") + } + if calls := srv.callCount.Load(); calls != 4 { + t.Errorf("expected 4 calls (3 unavailable + 1 success), got %d", calls) + } +} + +func TestLookupVolumeIdsStopsOnContextCancel(t *testing.T) { + srv := &fakeLookupServer{unavailableCount: 1000} + addr := startFakeMasterServer(t, srv) + + mc := NewMasterClient( + grpc.WithTransportCredentials(insecure.NewCredentials()), + "", "test", "", "", "", + pb.ServerDiscovery{}, + ) + mc.setCurrentMaster(addr) + mc.grpcTimeout = 5 * time.Second + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + provider := &masterVolumeProvider{masterClient: mc} + start := time.Now() + _, err := provider.LookupVolumeIds(ctx, []string{"1"}) + elapsed := time.Since(start) + + // Verify the error is from context deadline + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("expected context.DeadlineExceeded, got: %v", err) + } + // Verify the loop actually retried (not just an immediate failure) + if calls := srv.callCount.Load(); calls <= 1 { + t.Errorf("expected multiple retry attempts, got %d calls", calls) + } + if elapsed > 5*time.Second { + t.Errorf("took %v, expected to stop near context deadline of 2s", elapsed) + } +}