Browse Source

fix some typo and naming convention

pull/843/head
bingoohuang 7 years ago
parent
commit
66a20ac21e
  1. 6
      weed/command/volume.go
  2. 12
      weed/storage/disk_location.go
  3. 18
      weed/storage/store.go
  4. 10
      weed/storage/volume.go
  5. 13
      weed/topology/store_replicate.go

6
weed/command/volume.go

@ -125,11 +125,11 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
if *v.publicUrl == "" { if *v.publicUrl == "" {
*v.publicUrl = *v.ip + ":" + strconv.Itoa(*v.publicPort) *v.publicUrl = *v.ip + ":" + strconv.Itoa(*v.publicPort)
} }
isSeperatedPublicPort := *v.publicPort != *v.port
isSeparatedPublicPort := *v.publicPort != *v.port
volumeMux := http.NewServeMux() volumeMux := http.NewServeMux()
publicVolumeMux := volumeMux publicVolumeMux := volumeMux
if isSeperatedPublicPort {
if isSeparatedPublicPort {
publicVolumeMux = http.NewServeMux() publicVolumeMux = http.NewServeMux()
} }
@ -160,7 +160,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
if e != nil { if e != nil {
glog.Fatalf("Volume server listener error:%v", e) glog.Fatalf("Volume server listener error:%v", e)
} }
if isSeperatedPublicPort {
if isSeparatedPublicPort {
publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort) publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort)
glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress) glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress)
publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)

12
weed/storage/disk_location.go

@ -66,20 +66,20 @@ func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleM
func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrentFlag bool) { func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrentFlag bool) {
var concurrency int var concurrency int
if concurrentFlag { if concurrentFlag {
//You could choose a better optimized concurency value after testing at your environment
//You could choose a better optimized concurrency value after testing at your environment
concurrency = 10 concurrency = 10
} else { } else {
concurrency = 1 concurrency = 1
} }
task_queue := make(chan os.FileInfo, 10*concurrency)
taskQueue := make(chan os.FileInfo, 10*concurrency)
go func() { go func() {
if dirs, err := ioutil.ReadDir(l.Directory); err == nil { if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
for _, dir := range dirs { for _, dir := range dirs {
task_queue <- dir
taskQueue <- dir
} }
} }
close(task_queue)
close(taskQueue)
}() }()
var wg sync.WaitGroup var wg sync.WaitGroup
@ -88,13 +88,12 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, con
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
for dir := range task_queue {
for dir := range taskQueue {
l.loadExistingVolume(dir, needleMapKind, &mutex) l.loadExistingVolume(dir, needleMapKind, &mutex)
} }
}() }()
} }
wg.Wait() wg.Wait()
} }
func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
@ -202,5 +201,4 @@ func (l *DiskLocation) Close() {
for _, v := range l.volumes { for _, v := range l.volumes {
v.Close() v.Close()
} }
return
} }

18
weed/storage/store.go

@ -8,7 +8,7 @@ import (
) )
const ( const (
MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
MaxTtlVolumeRemovalDelay = 10 // 10 minutes
) )
/* /*
@ -19,7 +19,7 @@ type Store struct {
Port int Port int
PublicUrl string PublicUrl string
Locations []*DiskLocation Locations []*DiskLocation
dataCenter string //optional informaton, overwriting master setting if exists
dataCenter string //optional information, overwriting master setting if exists
rack string //optional information, overwriting master setting if exists rack string //optional information, overwriting master setting if exists
connected bool connected bool
VolumeSizeLimit uint64 //read from the master VolumeSizeLimit uint64 //read from the master
@ -30,15 +30,16 @@ type Store struct {
} }
func (s *Store) String() (str string) { func (s *Store) String() (str string) {
str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.VolumeSizeLimit)
str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d",
s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.VolumeSizeLimit)
return return
} }
func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) {
func NewStore(port int, ip, publicUrl string, dirNames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) {
s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind} s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind}
s.Locations = make([]*DiskLocation, 0) s.Locations = make([]*DiskLocation, 0)
for i := 0; i < len(dirnames); i++ {
location := NewDiskLocation(dirnames[i], maxVolumeCounts[i])
for i := 0; i < len(dirNames); i++ {
location := NewDiskLocation(dirNames[i], maxVolumeCounts[i])
location.loadExistingVolumes(needleMapKind) location.loadExistingVolumes(needleMapKind)
s.Locations = append(s.Locations, location) s.Locations = append(s.Locations, location)
} }
@ -46,7 +47,7 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts
s.DeletedVolumeIdChan = make(chan VolumeId, 3) s.DeletedVolumeIdChan = make(chan VolumeId, 3)
return return
} }
func (s *Store) AddVolume(volumeId VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64) error {
func (s *Store) AddVolume(volumeId VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement, ttlString string, preallocate int64) error {
rt, e := NewReplicaPlacementFromString(replicaPlacement) rt, e := NewReplicaPlacementFromString(replicaPlacement)
if e != nil { if e != nil {
return e return e
@ -133,6 +134,7 @@ func (s *Store) Status() []*VolumeInfo {
func (s *Store) SetDataCenter(dataCenter string) { func (s *Store) SetDataCenter(dataCenter string) {
s.dataCenter = dataCenter s.dataCenter = dataCenter
} }
func (s *Store) SetRack(rack string) { func (s *Store) SetRack(rack string) {
s.rack = rack s.rack = rack
} }
@ -163,7 +165,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
} }
volumeMessages = append(volumeMessages, volumeMessage) volumeMessages = append(volumeMessages, volumeMessage)
} else { } else {
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
if v.expiredLongEnough(MaxTtlVolumeRemovalDelay) {
location.deleteVolumeById(v.Id) location.deleteVolumeById(v.Id)
glog.V(0).Infoln("volume", v.Id, "is deleted.") glog.V(0).Infoln("volume", v.Id, "is deleted.")
} else { } else {

10
weed/storage/volume.go

@ -2,6 +2,7 @@ package storage
import ( import (
"fmt" "fmt"
"github.com/shirou/gopsutil/disk"
"os" "os"
"path" "path"
"sync" "sync"
@ -134,3 +135,12 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool {
} }
return false return false
} }
func IsUnderDiskWaterLevel(path string, waterLevel uint64) (uint64, bool, error) {
stat, e := disk.Usage(path)
if e != nil {
return 0, true, e
}
return stat.Free, stat.Free < waterLevel, nil
}

13
weed/topology/store_replicate.go

@ -17,29 +17,30 @@ import (
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
func ReplicatedWrite(masterNode string, s *storage.Store,
func ReplicatedWrite(masterNode string, store *storage.Store,
volumeId storage.VolumeId, needle *storage.Needle, volumeId storage.VolumeId, needle *storage.Needle,
r *http.Request) (size uint32, errorStatus string) { r *http.Request) (size uint32, errorStatus string) {
//check JWT //check JWT
jwt := security.GetJwt(r) jwt := security.GetJwt(r)
ret, err := s.Write(volumeId, needle)
needToReplicate := !s.HasVolume(volumeId)
ret, err := store.Write(volumeId, needle)
needToReplicate := !store.HasVolume(volumeId)
if err != nil { if err != nil {
errorStatus = "Failed to write to local disk (" + err.Error() + ")" errorStatus = "Failed to write to local disk (" + err.Error() + ")"
size = ret size = ret
return return
} }
needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate()
volume := store.GetVolume(volumeId)
needToReplicate = needToReplicate || volume.NeedToReplicate()
if !needToReplicate { if !needToReplicate {
needToReplicate = s.GetVolume(volumeId).NeedToReplicate()
needToReplicate = volume.NeedToReplicate()
} }
if needToReplicate { //send to other replica locations if needToReplicate { //send to other replica locations
if r.FormValue("type") != "replicate" { if r.FormValue("type") != "replicate" {
if err = distributedOperation(masterNode, s, volumeId, func(location operation.Location) error {
if err = distributedOperation(masterNode, store, volumeId, func(location operation.Location) error {
u := url.URL{ u := url.URL{
Scheme: "http", Scheme: "http",
Host: location.Url, Host: location.Url,

Loading…
Cancel
Save