|
@ -3,6 +3,7 @@ package weed_server |
|
|
import ( |
|
|
import ( |
|
|
"fmt" |
|
|
"fmt" |
|
|
"os" |
|
|
"os" |
|
|
|
|
|
"strings" |
|
|
"time" |
|
|
"time" |
|
|
|
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/operation" |
|
|
"github.com/seaweedfs/seaweedfs/weed/operation" |
|
@ -57,6 +58,7 @@ func (vs *VolumeServer) heartbeat() { |
|
|
|
|
|
|
|
|
var err error |
|
|
var err error |
|
|
var newLeader pb.ServerAddress |
|
|
var newLeader pb.ServerAddress |
|
|
|
|
|
duplicateRetryCount := 0 |
|
|
for vs.isHeartbeating { |
|
|
for vs.isHeartbeating { |
|
|
for _, master := range vs.SeedMasterNodes { |
|
|
for _, master := range vs.SeedMasterNodes { |
|
|
if newLeader != "" { |
|
|
if newLeader != "" { |
|
@ -66,12 +68,27 @@ func (vs *VolumeServer) heartbeat() { |
|
|
master = newLeader |
|
|
master = newLeader |
|
|
} |
|
|
} |
|
|
vs.store.MasterAddress = master |
|
|
vs.store.MasterAddress = master |
|
|
newLeader, err = vs.doHeartbeat(master, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second) |
|
|
|
|
|
|
|
|
newLeader, err = vs.doHeartbeatWithRetry(master, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second, duplicateRetryCount) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
glog.V(0).Infof("heartbeat to %s error: %v", master, err) |
|
|
glog.V(0).Infof("heartbeat to %s error: %v", master, err) |
|
|
|
|
|
|
|
|
|
|
|
// Check if this is a duplicate UUID retry error
|
|
|
|
|
|
if strings.Contains(err.Error(), "duplicate UUIDs detected, retrying connection") { |
|
|
|
|
|
duplicateRetryCount++ |
|
|
|
|
|
retryDelay := time.Duration(1<<(duplicateRetryCount-1)) * 2 * time.Second // exponential backoff: 2s, 4s, 8s
|
|
|
|
|
|
glog.V(0).Infof("Waiting %v before retrying due to duplicate UUID detection...", retryDelay) |
|
|
|
|
|
time.Sleep(retryDelay) |
|
|
|
|
|
} else { |
|
|
|
|
|
// Regular error, reset duplicate retry count
|
|
|
|
|
|
duplicateRetryCount = 0 |
|
|
time.Sleep(time.Duration(vs.pulseSeconds) * time.Second) |
|
|
time.Sleep(time.Duration(vs.pulseSeconds) * time.Second) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
newLeader = "" |
|
|
newLeader = "" |
|
|
vs.store.MasterAddress = "" |
|
|
vs.store.MasterAddress = "" |
|
|
|
|
|
} else { |
|
|
|
|
|
// Successful connection, reset retry count
|
|
|
|
|
|
duplicateRetryCount = 0 |
|
|
} |
|
|
} |
|
|
if !vs.isHeartbeating { |
|
|
if !vs.isHeartbeating { |
|
|
break |
|
|
break |
|
@ -90,6 +107,10 @@ func (vs *VolumeServer) StopHeartbeat() (isAlreadyStopping bool) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader pb.ServerAddress, err error) { |
|
|
func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader pb.ServerAddress, err error) { |
|
|
|
|
|
return vs.doHeartbeatWithRetry(masterAddress, grpcDialOption, sleepInterval, 0) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (vs *VolumeServer) doHeartbeatWithRetry(masterAddress pb.ServerAddress, grpcDialOption grpc.DialOption, sleepInterval time.Duration, duplicateRetryCount int) (newLeader pb.ServerAddress, err error) { |
|
|
|
|
|
|
|
|
ctx, cancel := context.WithCancel(context.Background()) |
|
|
ctx, cancel := context.WithCancel(context.Background()) |
|
|
defer cancel() |
|
|
defer cancel() |
|
@ -127,9 +148,24 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
glog.Errorf("Shut down Volume Server due to duplicate volume directories: %v", duplicateDir) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Implement retry logic for potential race conditions
|
|
|
|
|
|
const maxRetries = 3 |
|
|
|
|
|
if duplicateRetryCount < maxRetries { |
|
|
|
|
|
retryDelay := time.Duration(1<<duplicateRetryCount) * 2 * time.Second // exponential backoff: 2s, 4s, 8s
|
|
|
|
|
|
glog.Errorf("Master reported duplicate volume directories: %v (retry %d/%d)", duplicateDir, duplicateRetryCount+1, maxRetries) |
|
|
|
|
|
glog.Errorf("This might be due to a race condition during reconnection. Waiting %v before retrying...", retryDelay) |
|
|
|
|
|
|
|
|
|
|
|
// Return error to trigger retry with increased count
|
|
|
|
|
|
doneChan <- fmt.Errorf("duplicate UUIDs detected, retrying connection (attempt %d/%d)", duplicateRetryCount+1, maxRetries) |
|
|
|
|
|
return |
|
|
|
|
|
} else { |
|
|
|
|
|
// After max retries, this is likely a real duplicate
|
|
|
|
|
|
glog.Errorf("Shut down Volume Server due to persistent duplicate volume directories after %d retries: %v", maxRetries, duplicateDir) |
|
|
|
|
|
glog.Errorf("Please check if another volume server is using the same directory") |
|
|
os.Exit(1) |
|
|
os.Exit(1) |
|
|
} |
|
|
} |
|
|
|
|
|
} |
|
|
volumeOptsChanged := false |
|
|
volumeOptsChanged := false |
|
|
if vs.store.GetPreallocate() != in.GetPreallocate() { |
|
|
if vs.store.GetPreallocate() != in.GetPreallocate() { |
|
|
vs.store.SetPreallocate(in.GetPreallocate()) |
|
|
vs.store.SetPreallocate(in.GetPreallocate()) |
|
|