diff --git a/weed/command/volume.go b/weed/command/volume.go index 58dee0e52..cbd5bc676 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -258,7 +258,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v v.folders, v.folderMaxLimits, minFreeSpaces, diskTypes, *v.idxFolder, volumeNeedleMapKind, - v.masters, constants.VolumePulseSeconds, *v.dataCenter, *v.rack, + v.masters, constants.VolumePulsePeriod, *v.dataCenter, *v.rack, v.whiteList, *v.fixJpgOrientation, *v.readMode, *v.compactionMBPerSecond, diff --git a/weed/server/constants/volume.go b/weed/server/constants/volume.go index 77c7b7b47..a1287d118 100644 --- a/weed/server/constants/volume.go +++ b/weed/server/constants/volume.go @@ -1,5 +1,7 @@ package constants +import "time" + const ( - VolumePulseSeconds = 5 + VolumePulsePeriod = 5 * time.Second ) diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 2abde1bd9..9c2f8b213 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -68,7 +68,7 @@ func (vs *VolumeServer) heartbeat() { master = newLeader } vs.store.MasterAddress = master - newLeader, err = vs.doHeartbeatWithRetry(master, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second, duplicateRetryCount) + newLeader, err = vs.doHeartbeatWithRetry(master, grpcDialOption, vs.pulsePeriod, duplicateRetryCount) if err != nil { glog.V(0).Infof("heartbeat to %s error: %v", master, err) @@ -81,7 +81,7 @@ func (vs *VolumeServer) heartbeat() { } else { // Regular error, reset duplicate retry count duplicateRetryCount = 0 - time.Sleep(time.Duration(vs.pulseSeconds) * time.Second) + time.Sleep(vs.pulsePeriod) } newLeader = "" diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 66c62b98c..4f8a7fb0d 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -35,7 +35,7 @@ type VolumeServer struct { SeedMasterNodes []pb.ServerAddress whiteList []string currentMaster pb.ServerAddress - pulseSeconds int + pulsePeriod time.Duration dataCenter string rack string store *storage.Store @@ -59,7 +59,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, folders []string, maxCounts []int32, minFreeSpaces []util.MinFreeSpace, diskTypes []types.DiskType, idxFolder string, needleMapKind storage.NeedleMapKind, - masterNodes []pb.ServerAddress, pulseSeconds int, + masterNodes []pb.ServerAddress, pulsePeriod time.Duration, dataCenter string, rack string, whiteList []string, fixJpgOrientation bool, @@ -86,7 +86,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds") vs := &VolumeServer{ - pulseSeconds: pulseSeconds, + pulsePeriod: pulsePeriod, dataCenter: dataCenter, rack: rack, needleMapKind: needleMapKind, diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index a8cc72d4d..741df0dd4 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -88,7 +88,8 @@ func (c *commandVolumeCheckDisk) eqVolumeFileCount(a, b *VolumeReplica) (bool, b return fileCountA == fileCountB, fileDeletedCountA == fileDeletedCountB } -func (c *commandVolumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica, pulseTimeAtSecond int64, syncDeletions, verbose bool) bool { +func (c *commandVolumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica, pulseTime time.Time, syncDeletions, verbose bool) bool { + pulseTimeAtSecond := pulseTime.Unix() doSyncDeletedCount := false if syncDeletions && a.info.DeleteCount != b.info.DeleteCount { doSyncDeletedCount = true @@ -135,7 +136,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write c.writer = writer // collect topology information - pulseTimeAtSecond := time.Now().Unix() - constants.VolumePulseSeconds*2 + pulseTime := time.Now().Add(-constants.VolumePulsePeriod * 2) topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) if err != nil { return err @@ -162,7 +163,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write }) for len(writableReplicas) >= 2 { a, b := writableReplicas[0], writableReplicas[1] - if !*slowMode && c.shouldSkipVolume(a, b, pulseTimeAtSecond, *syncDeletions, *verbose) { + if !*slowMode && c.shouldSkipVolume(a, b, pulseTime, *syncDeletions, *verbose) { // always choose the larger volume to be the source writableReplicas = append(replicas[:1], writableReplicas[2:]...) continue diff --git a/weed/shell/command_volume_check_disk_test.go b/weed/shell/command_volume_check_disk_test.go index ab9832bd4..d86b40f1f 100644 --- a/weed/shell/command_volume_check_disk_test.go +++ b/weed/shell/command_volume_check_disk_test.go @@ -1,9 +1,11 @@ package shell import ( - "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "os" "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" ) type testCommandVolumeCheckDisk struct { @@ -65,7 +67,8 @@ func TestShouldSkipVolume(t *testing.T) { }, } for num, tt := range tests { - if isShould := cmdVolumeCheckDisk.shouldSkipVolume(&tt.a, &tt.b, tt.pulseTimeAtSecond, true, true); isShould != tt.shouldSkipVolume { + pulseTime := time.Unix(tt.pulseTimeAtSecond, 0) + if isShould := cmdVolumeCheckDisk.shouldSkipVolume(&tt.a, &tt.b, pulseTime, true, true); isShould != tt.shouldSkipVolume { t.Fatalf("result of should skip volume is unexpected for %d test", num) } } diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index 2a71c6e23..5442ccdce 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -152,9 +152,9 @@ func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topolo } }() - for !topo.LastLeaderChangeTime.Add(constants.VolumePulseSeconds * 2).Before(time.Now()) { + for !topo.LastLeaderChangeTime.Add(constants.VolumePulsePeriod * 2).Before(time.Now()) { glog.V(0).Infof("wait for volume servers to join back") - time.Sleep(constants.VolumePulseSeconds / 2) + time.Sleep(constants.VolumePulsePeriod / 2) } vid, raftErr := topo.NextVolumeId() if raftErr != nil {