From 66a20ac21e6d2b79da417816fb1a396e9c0f14b0 Mon Sep 17 00:00:00 2001 From: bingoohuang Date: Fri, 18 Jan 2019 09:31:09 +0800 Subject: [PATCH] fix some typo and naming convention --- weed/command/volume.go | 6 +++--- weed/storage/disk_location.go | 12 +++++------- weed/storage/store.go | 18 ++++++++++-------- weed/storage/volume.go | 10 ++++++++++ weed/topology/store_replicate.go | 13 +++++++------ 5 files changed, 35 insertions(+), 24 deletions(-) diff --git a/weed/command/volume.go b/weed/command/volume.go index 27a075b5b..f7f7c9eb1 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -125,11 +125,11 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v if *v.publicUrl == "" { *v.publicUrl = *v.ip + ":" + strconv.Itoa(*v.publicPort) } - isSeperatedPublicPort := *v.publicPort != *v.port + isSeparatedPublicPort := *v.publicPort != *v.port volumeMux := http.NewServeMux() publicVolumeMux := volumeMux - if isSeperatedPublicPort { + if isSeparatedPublicPort { publicVolumeMux = http.NewServeMux() } @@ -160,7 +160,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v if e != nil { glog.Fatalf("Volume server listener error:%v", e) } - if isSeperatedPublicPort { + if isSeparatedPublicPort { publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort) glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress) publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 9589d9281..c30cab205 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -66,20 +66,20 @@ func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleM func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrentFlag bool) { var concurrency int if concurrentFlag { - //You could choose a better optimized concurency value after testing at your environment + //You could choose a better optimized concurrency value after testing at your environment concurrency = 10 } else { concurrency = 1 } - task_queue := make(chan os.FileInfo, 10*concurrency) + taskQueue := make(chan os.FileInfo, 10*concurrency) go func() { if dirs, err := ioutil.ReadDir(l.Directory); err == nil { for _, dir := range dirs { - task_queue <- dir + taskQueue <- dir } } - close(task_queue) + close(taskQueue) }() var wg sync.WaitGroup @@ -88,13 +88,12 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, con wg.Add(1) go func() { defer wg.Done() - for dir := range task_queue { + for dir := range taskQueue { l.loadExistingVolume(dir, needleMapKind, &mutex) } }() } wg.Wait() - } func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { @@ -202,5 +201,4 @@ func (l *DiskLocation) Close() { for _, v := range l.volumes { v.Close() } - return } diff --git a/weed/storage/store.go b/weed/storage/store.go index 96c819666..e5ff131be 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -8,7 +8,7 @@ import ( ) const ( - MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes + MaxTtlVolumeRemovalDelay = 10 // 10 minutes ) /* @@ -19,7 +19,7 @@ type Store struct { Port int PublicUrl string Locations []*DiskLocation - dataCenter string //optional informaton, overwriting master setting if exists + dataCenter string //optional information, overwriting master setting if exists rack string //optional information, overwriting master setting if exists connected bool VolumeSizeLimit uint64 //read from the master @@ -30,15 +30,16 @@ type Store struct { } func (s *Store) String() (str string) { - str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.VolumeSizeLimit) + str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", + s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.VolumeSizeLimit) return } -func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) { +func NewStore(port int, ip, publicUrl string, dirNames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) { s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind} s.Locations = make([]*DiskLocation, 0) - for i := 0; i < len(dirnames); i++ { - location := NewDiskLocation(dirnames[i], maxVolumeCounts[i]) + for i := 0; i < len(dirNames); i++ { + location := NewDiskLocation(dirNames[i], maxVolumeCounts[i]) location.loadExistingVolumes(needleMapKind) s.Locations = append(s.Locations, location) } @@ -46,7 +47,7 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts s.DeletedVolumeIdChan = make(chan VolumeId, 3) return } -func (s *Store) AddVolume(volumeId VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64) error { +func (s *Store) AddVolume(volumeId VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement, ttlString string, preallocate int64) error { rt, e := NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e @@ -133,6 +134,7 @@ func (s *Store) Status() []*VolumeInfo { func (s *Store) SetDataCenter(dataCenter string) { s.dataCenter = dataCenter } + func (s *Store) SetRack(rack string) { s.rack = rack } @@ -163,7 +165,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { } volumeMessages = append(volumeMessages, volumeMessage) } else { - if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) { + if v.expiredLongEnough(MaxTtlVolumeRemovalDelay) { location.deleteVolumeById(v.Id) glog.V(0).Infoln("volume", v.Id, "is deleted.") } else { diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 07c72ecb4..ef65a53a8 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -2,6 +2,7 @@ package storage import ( "fmt" + "github.com/shirou/gopsutil/disk" "os" "path" "sync" @@ -134,3 +135,12 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool { } return false } + +func IsUnderDiskWaterLevel(path string, waterLevel uint64) (uint64, bool, error) { + stat, e := disk.Usage(path) + if e != nil { + return 0, true, e + } + + return stat.Free, stat.Free < waterLevel, nil +} diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index c73fb706a..8f7f72175 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -17,29 +17,30 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func ReplicatedWrite(masterNode string, s *storage.Store, +func ReplicatedWrite(masterNode string, store *storage.Store, volumeId storage.VolumeId, needle *storage.Needle, r *http.Request) (size uint32, errorStatus string) { //check JWT jwt := security.GetJwt(r) - ret, err := s.Write(volumeId, needle) - needToReplicate := !s.HasVolume(volumeId) + ret, err := store.Write(volumeId, needle) + needToReplicate := !store.HasVolume(volumeId) if err != nil { errorStatus = "Failed to write to local disk (" + err.Error() + ")" size = ret return } - needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate() + volume := store.GetVolume(volumeId) + needToReplicate = needToReplicate || volume.NeedToReplicate() if !needToReplicate { - needToReplicate = s.GetVolume(volumeId).NeedToReplicate() + needToReplicate = volume.NeedToReplicate() } if needToReplicate { //send to other replica locations if r.FormValue("type") != "replicate" { - if err = distributedOperation(masterNode, s, volumeId, func(location operation.Location) error { + if err = distributedOperation(masterNode, store, volumeId, func(location operation.Location) error { u := url.URL{ Scheme: "http", Host: location.Url,