From 6201ed537ef11f91c24c14612f1d087796e2d5f8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 3 Dec 2012 22:54:08 -0800 Subject: [PATCH] reporting volume size as early as possible --- weed-fs/src/cmd/weed/master.go | 3 ++ weed-fs/src/cmd/weed/volume.go | 3 +- weed-fs/src/pkg/storage/needle_map.go | 18 ++++--- weed-fs/src/pkg/storage/store.go | 68 +++++++++++++++++--------- weed-fs/src/pkg/storage/volume.go | 5 +- weed-fs/src/pkg/storage/volume_info.go | 15 +++--- 6 files changed, 74 insertions(+), 38 deletions(-) diff --git a/weed-fs/src/cmd/weed/master.go b/weed-fs/src/cmd/weed/master.go index 92d96718a..a70f8273e 100644 --- a/weed-fs/src/cmd/weed/master.go +++ b/weed-fs/src/cmd/weed/master.go @@ -114,6 +114,9 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) { json.Unmarshal([]byte(r.FormValue("volumes")), volumes) debug(s, "volumes", r.FormValue("volumes")) topo.RegisterVolumes(*volumes, ip, port, publicUrl, maxVolumeCount) + m := make(map[string]interface{}) + m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB)*1024*1024 + writeJson(w, r, m) } func dirStatusHandler(w http.ResponseWriter, r *http.Request) { diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go index 88841273a..f12f9f4e0 100644 --- a/weed-fs/src/cmd/weed/volume.go +++ b/weed-fs/src/cmd/weed/volume.go @@ -317,8 +317,9 @@ func runVolume(cmd *Command, args []string) bool { go func() { connected := true + store.SetMaster(*masterNode) for { - err := store.Join(*masterNode) + err := store.Join() if err == nil { if !connected { connected = true diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go index ac6998337..e01c27630 100644 --- a/weed-fs/src/pkg/storage/needle_map.go +++ b/weed-fs/src/pkg/storage/needle_map.go @@ -15,7 +15,8 @@ type NeedleMap struct { deletionCounter int fileCounter int - deletionByteCounter uint32 + deletionByteCounter uint64 + fileByteCounter uint64 } func NewNeedleMap(file *os.File) *NeedleMap { @@ -44,19 +45,20 @@ func LoadNeedleMap(file *os.File) *NeedleMap { key := util.BytesToUint64(bytes[i : i+8]) offset := util.BytesToUint32(bytes[i+8 : i+12]) size := util.BytesToUint32(bytes[i+12 : i+16]) + nm.fileCounter++ + nm.fileByteCounter = nm.fileByteCounter + uint64(size) if offset > 0 { oldSize := nm.m.Set(Key(key), offset, size) //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize) - nm.fileCounter++ if oldSize > 0 { nm.deletionCounter++ - nm.deletionByteCounter = nm.deletionByteCounter + oldSize + nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) } } else { nm.m.Delete(Key(key)) //log.Println("removing key", key) nm.deletionCounter++ - nm.deletionByteCounter = nm.deletionByteCounter + size + nm.deletionByteCounter = nm.deletionByteCounter + uint64(size) } } @@ -71,9 +73,10 @@ func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { util.Uint32toBytes(nm.bytes[8:12], offset) util.Uint32toBytes(nm.bytes[12:16], size) nm.fileCounter++ + nm.fileByteCounter = nm.fileByteCounter + uint64(size) if oldSize > 0 { nm.deletionCounter++ - nm.deletionByteCounter = nm.deletionByteCounter + oldSize + nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) } return nm.indexFile.Write(nm.bytes) } @@ -82,7 +85,7 @@ func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { return } func (nm *NeedleMap) Delete(key uint64) { - nm.deletionByteCounter = nm.deletionByteCounter + nm.m.Delete(Key(key)) + nm.deletionByteCounter = nm.deletionByteCounter + uint64(nm.m.Delete(Key(key))) util.Uint64toBytes(nm.bytes[0:8], key) util.Uint32toBytes(nm.bytes[8:12], 0) util.Uint32toBytes(nm.bytes[12:16], 0) @@ -92,3 +95,6 @@ func (nm *NeedleMap) Delete(key uint64) { func (nm *NeedleMap) Close() { nm.indexFile.Close() } +func (nm *NeedleMap) ContentSize() uint64 { + return nm.fileByteCounter +} diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index 03d4357e4..abd336d5d 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -18,6 +18,10 @@ type Store struct { Ip string PublicUrl string MaxVolumeCount int + + //read from the master + masterNode string + volumeSizeLimit uint64 } func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *Store) { @@ -70,15 +74,15 @@ func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error { } func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) { - vid, err := NewVolumeId(volumeIdString) - if err != nil { - return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!"), false - } - garbageThreshold, e := strconv.ParseFloat(garbageThresholdString, 32) - if e != nil { - return errors.New("garbageThreshold " + garbageThresholdString + " is not a valid float number!"), false - } - return nil, garbageThreshold < s.volumes[vid].garbageLevel() + vid, err := NewVolumeId(volumeIdString) + if err != nil { + return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!"), false + } + garbageThreshold, e := strconv.ParseFloat(garbageThresholdString, 32) + if e != nil { + return errors.New("garbageThreshold " + garbageThresholdString + " is not a valid float number!"), false + } + return nil, garbageThreshold < s.volumes[vid].garbageLevel() } func (s *Store) CompactVolume(volumeIdString string) error { vid, err := NewVolumeId(volumeIdString) @@ -87,12 +91,12 @@ func (s *Store) CompactVolume(volumeIdString string) error { } return s.volumes[vid].compact() } -func (s *Store) CommitCompactVolume(volumeIdString string) (error) { - vid, err := NewVolumeId(volumeIdString) - if err != nil { - return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!") - } - return s.volumes[vid].commitCompact() +func (s *Store) CommitCompactVolume(volumeIdString string) error { + vid, err := NewVolumeId(volumeIdString) + if err != nil { + return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!") + } + return s.volumes[vid].commitCompact() } func (s *Store) loadExistingVolumes() { if dirs, err := ioutil.ReadDir(s.dir); err == nil { @@ -115,16 +119,24 @@ func (s *Store) Status() []*VolumeInfo { var stats []*VolumeInfo for k, v := range s.volumes { s := new(VolumeInfo) - s.Id, s.Size, s.RepType, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), v.Size(), v.replicaType, v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter + s.Id, s.Size, s.RepType, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), v.ContentSize(), v.replicaType, v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter stats = append(stats, s) } return stats } -func (s *Store) Join(mserver string) error { + +type JoinResult struct { + VolumeSizeLimit uint64 +} + +func (s *Store) SetMaster(mserver string) { + s.masterNode = mserver +} +func (s *Store) Join() error { stats := new([]*VolumeInfo) for k, v := range s.volumes { s := new(VolumeInfo) - s.Id, s.Size, s.RepType, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), v.Size(), v.replicaType, v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter + s.Id, s.Size, s.RepType, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), uint64(v.Size()), v.replicaType, v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter *stats = append(*stats, s) } bytes, _ := json.Marshal(stats) @@ -134,8 +146,16 @@ func (s *Store) Join(mserver string) error { values.Add("publicUrl", s.PublicUrl) values.Add("volumes", string(bytes)) values.Add("maxVolumeCount", strconv.Itoa(s.MaxVolumeCount)) - _, err := util.Post("http://"+mserver+"/dir/join", values) - return err + jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values) + if err != nil { + return err + } + var ret JoinResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + return err + } + s.volumeSizeLimit = ret.VolumeSizeLimit + return nil } func (s *Store) Close() { for _, v := range s.volumes { @@ -144,9 +164,13 @@ func (s *Store) Close() { } func (s *Store) Write(i VolumeId, n *Needle) uint32 { if v := s.volumes[i]; v != nil { - return v.write(n) + size := v.write(n) + if s.volumeSizeLimit < v.ContentSize()+uint64(size) { + s.Join() + } + return size } - log.Println("volume",i, "not found!") + log.Println("volume", i, "not found!") return 0 } func (s *Store) Delete(i VolumeId, n *Needle) uint32 { diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index 285356e26..f7314d3ed 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -132,7 +132,7 @@ func (v *Volume) read(n *Needle) (int, error) { } func (v *Volume) garbageLevel() float64 { - return float64(v.nm.deletionByteCounter)/float64(v.Size()) + return float64(v.nm.deletionByteCounter)/float64(v.ContentSize()) } func (v *Volume) compact() error { @@ -212,3 +212,6 @@ func (v *Volume) copyDataAndGenerateIndexFile(srcName, dstName, idxName string) return nil } +func (v *Volume) ContentSize() uint64{ + return v.nm.fileByteCounter +} diff --git a/weed-fs/src/pkg/storage/volume_info.go b/weed-fs/src/pkg/storage/volume_info.go index dfedb0af3..060277e0a 100644 --- a/weed-fs/src/pkg/storage/volume_info.go +++ b/weed-fs/src/pkg/storage/volume_info.go @@ -1,13 +1,12 @@ package storage -import ( -) +import () type VolumeInfo struct { - Id VolumeId - Size int64 - RepType ReplicationType - FileCount int - DeleteCount int - DeletedByteCount uint32 + Id VolumeId + Size uint64 + RepType ReplicationType + FileCount int + DeleteCount int + DeletedByteCount uint64 }