diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index e86c33bda..cc06f0092 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -58,6 +58,9 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, adminMux.HandleFunc("/admin/sync/status", vs.guard.WhiteList(vs.getVolumeSyncStatusHandler)) adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler)) adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler)) + adminMux.HandleFunc("/admin/volume/mount", vs.guard.WhiteList(vs.getVolumeMountHandler)) + adminMux.HandleFunc("/admin/volume/unmount", vs.guard.WhiteList(vs.getVolumeUnmountHandler)) + adminMux.HandleFunc("/admin/volume/delete", vs.guard.WhiteList(vs.getVolumeDeleteHandler)) adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go index 28631dac7..79bb89756 100644 --- a/weed/server/volume_server_handlers_admin.go +++ b/weed/server/volume_server_handlers_admin.go @@ -1,6 +1,7 @@ package weed_server import ( + "fmt" "net/http" "path/filepath" "strconv" @@ -8,6 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/storage" ) func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { @@ -65,3 +67,45 @@ func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) m["DiskStatuses"] = ds writeJsonQuiet(w, r, http.StatusOK, m) } + +func (vs *VolumeServer) getVolume(volumeParameterName string, r *http.Request) (*storage.Volume, error) { + vid, err := vs.getVolumeId(volumeParameterName, r) + if err != nil { + return nil, err + } + v := vs.store.GetVolume(vid) + if v == nil { + return nil, fmt.Errorf("Not Found Volume Id %d", vid) + } + return v, nil +} + +func (vs *VolumeServer) getVolumeMountHandler(w http.ResponseWriter, r *http.Request) { + vid, err := vs.getVolumeId("volume", r) + if err != nil { + writeJsonError(w, r, http.StatusNotFound, err) + return + } + vs.store.MountVolume(vid) + writeJsonQuiet(w, r, http.StatusOK, "Volume mounted") +} + +func (vs *VolumeServer) getVolumeUnmountHandler(w http.ResponseWriter, r *http.Request) { + vid, err := vs.getVolumeId("volume", r) + if err != nil { + writeJsonError(w, r, http.StatusNotFound, err) + return + } + vs.store.UnmountVolume(vid) + writeJsonQuiet(w, r, http.StatusOK, "Volume unmounted") +} + +func (vs *VolumeServer) getVolumeDeleteHandler(w http.ResponseWriter, r *http.Request) { + vid, err := vs.getVolumeId("volume", r) + if err != nil { + writeJsonError(w, r, http.StatusNotFound, err) + return + } + vs.store.DeleteVolume(vid) + writeJsonQuiet(w, r, http.StatusOK, "Volume deleted") +} diff --git a/weed/server/volume_server_handlers_sync.go b/weed/server/volume_server_handlers_sync.go index 8a2e30743..68c381e28 100644 --- a/weed/server/volume_server_handlers_sync.go +++ b/weed/server/volume_server_handlers_sync.go @@ -68,20 +68,19 @@ func (vs *VolumeServer) getVolumeDataContentHandler(w http.ResponseWriter, r *ht w.Write(content) } -func (vs *VolumeServer) getVolume(volumeParameterName string, r *http.Request) (*storage.Volume, error) { +func (vs *VolumeServer) getVolumeId(volumeParameterName string, r *http.Request) (storage.VolumeId, error) { volumeIdString := r.FormValue(volumeParameterName) + if volumeIdString == "" { err := fmt.Errorf("Empty Volume Id: Need to pass in %s=the_volume_id.", volumeParameterName) - return nil, err + return 0, err } + vid, err := storage.NewVolumeId(volumeIdString) if err != nil { err = fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) - return nil, err - } - v := vs.store.GetVolume(vid) - if v == nil { - return nil, fmt.Errorf("Not Found Volume Id %s: %d", volumeIdString, vid) + return 0, err } - return v, nil -} + + return vid, err +} \ No newline at end of file diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 496e0dd57..9b9468c5b 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/chrislusf/seaweedfs/weed/glog" + "fmt" ) type DiskLocation struct { @@ -22,16 +23,27 @@ func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation { return location } -func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleMapType, mutex *sync.RWMutex) { +func (l *DiskLocation) volumeIdFromPath(dir os.FileInfo) (VolumeId, string, error) { name := dir.Name() if !dir.IsDir() && strings.HasSuffix(name, ".dat") { collection := "" - base := name[:len(name)-len(".dat")] + base := name[:len(name) - len(".dat")] i := strings.LastIndex(base, "_") if i > 0 { - collection, base = base[0:i], base[i+1:] + collection, base = base[0:i], base[i + 1:] } - if vid, err := NewVolumeId(base); err == nil { + vol, err := NewVolumeId(base); + return vol, collection, err + } + + return 0, "", fmt.Errorf("Path is not a volume: %s", name) +} + +func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleMapType, mutex *sync.RWMutex) { + name := dir.Name() + if !dir.IsDir() && strings.HasSuffix(name, ".dat") { + vid, collection, err := l.volumeIdFromPath(dir) + if err == nil { mutex.RLock() _, found := l.volumes[vid] mutex.RUnlock() @@ -42,10 +54,6 @@ func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleM mutex.Unlock() glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String()) - if v.Size() != v.dataFileSize { - glog.V(0).Infof("data file %s, size=%d expected=%d", - l.Directory+"/"+name, v.Size(), v.dataFileSize) - } } else { glog.V(0).Infof("new volume %s error %s", name, e) } @@ -125,6 +133,38 @@ func (l *DiskLocation) deleteVolumeById(vid VolumeId) (e error) { return } +func (l *DiskLocation) LoadVolume(vid VolumeId, needleMapKind NeedleMapType) bool { + if dirs, err := ioutil.ReadDir(l.Directory); err == nil { + for _, dir := range dirs { + volId, _, err := l.volumeIdFromPath(dir) + if vid == volId && err == nil { + var mutex sync.RWMutex + l.loadExistingVolume(dir, needleMapKind, &mutex) + return true + } + } + } + + return false +} + +func (l *DiskLocation) DeleteVolume(vid VolumeId) (error) { + _, ok := l.volumes[vid] + if !ok { + return fmt.Errorf("Volume not found, VolumeId: %d", vid) + } + return l.deleteVolumeById(vid) +} + +func (l *DiskLocation) UnloadVolume(vid VolumeId) (error) { + _, ok := l.volumes[vid] + if !ok { + return fmt.Errorf("Volume not loaded, VolumeId: %d", vid) + } + delete(l.volumes, vid) + return nil +} + func (l *DiskLocation) SetVolume(vid VolumeId, volume *Volume) { l.Lock() defer l.Unlock() diff --git a/weed/storage/store.go b/weed/storage/store.go index f70ebe0ee..58c8de0d7 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -77,6 +77,7 @@ type Store struct { connected bool VolumeSizeLimit uint64 //read from the master Client pb.Seaweed_SendHeartbeatClient + NeedleMapType NeedleMapType } func (s *Store) String() (str string) { @@ -85,7 +86,7 @@ func (s *Store) String() (str string) { } func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) { - s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl} + s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind} s.Locations = make([]*DiskLocation, 0) for i := 0; i < len(dirnames); i++ { location := NewDiskLocation(dirnames[i], maxVolumeCounts[i]) @@ -261,6 +262,7 @@ func (s *Store) Close() { location.Close() } } + func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { if v := s.findVolume(i); v != nil { if v.readOnly { @@ -275,11 +277,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { } if s.VolumeSizeLimit < v.ContentSize()+3*uint64(size) { glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.VolumeSizeLimit) - if s.Client != nil { - if e := s.Client.Send(s.CollectHeartbeat()); e != nil { - glog.V(0).Infoln("error when reporting size:", e) - } - } + s.updateMaster() } return } @@ -287,17 +285,27 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { err = fmt.Errorf("Volume %d not found!", i) return } + +func (s *Store) updateMaster() { + if s.Client != nil { + if e := s.Client.Send(s.CollectHeartbeat()); e != nil { + glog.V(0).Infoln("error when reporting size:", e) + } + } +} + func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) { if v := s.findVolume(i); v != nil && !v.readOnly { return v.deleteNeedle(n) } return 0, nil } + func (s *Store) ReadVolumeNeedle(i VolumeId, n *Needle) (int, error) { if v := s.findVolume(i); v != nil { return v.readNeedle(n) } - return 0, fmt.Errorf("Volume %v not found!", i) + return 0, fmt.Errorf("Volume %d not found!", i) } func (s *Store) GetVolume(i VolumeId) *Volume { return s.findVolume(i) @@ -307,3 +315,36 @@ func (s *Store) HasVolume(i VolumeId) bool { v := s.findVolume(i) return v != nil } + +func (s *Store) MountVolume(i VolumeId) error { + for _, location := range s.Locations { + if found := location.LoadVolume(i, s.NeedleMapType); found == true { + s.updateMaster() + return nil + } + } + + return fmt.Errorf("Volume %d not found on disk", i) +} + +func (s *Store) UnmountVolume(i VolumeId) error { + for _, location := range s.Locations { + if err := location.UnloadVolume(i); err == nil { + s.updateMaster() + return nil + } + } + + return fmt.Errorf("Volume %d not found on disk", i) +} + +func (s *Store) DeleteVolume(i VolumeId) error { + for _, location := range s.Locations { + if error := location.deleteVolumeById(i); error == nil { + s.updateMaster() + return nil + } + } + + return fmt.Errorf("Volume %d not found on disk", i) +} diff --git a/weed/storage/volume.go b/weed/storage/volume.go index df9f0b7a7..f168ad155 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -15,7 +15,6 @@ type Volume struct { dir string Collection string dataFile *os.File - dataFileSize int64 nm NeedleMapper needleMapKind NeedleMapType readOnly bool diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index 6d4011f27..67538ebb2 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -12,28 +12,28 @@ func getActualSize(size uint32) int64 { return NeedleHeaderSize + int64(size) + NeedleChecksumSize + int64(padding) } -func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (int64, error) { +func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (error) { var indexSize int64 var e error if indexSize, e = verifyIndexFileIntegrity(indexFile); e != nil { - return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e) + return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e) } if indexSize == 0 { - return int64(SuperBlockSize), nil + return nil } var lastIdxEntry []byte if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleIndexSize); e != nil { - return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e) + return fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e) } key, offset, size := idxFileEntry(lastIdxEntry) if offset == 0 || size == TombstoneFileSize { - return 0, nil + return nil } if e = verifyNeedleIntegrity(v.dataFile, v.Version(), int64(offset)*NeedlePaddingSize, key, size); e != nil { - return 0, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e) + return fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e) } - return int64(offset)*int64(NeedlePaddingSize) + getActualSize(size), nil + return nil } func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) { diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index c4f1aae9b..4be860987 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -64,7 +64,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e) } } - if v.dataFileSize, e = CheckVolumeDataIntegrity(v, indexFile); e != nil { + if e = CheckVolumeDataIntegrity(v, indexFile); e != nil { v.readOnly = true glog.V(0).Infof("volumeDataIntegrityChecking failed %v", e) } diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 66f18557f..2314bc815 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -60,8 +60,6 @@ func (v *Volume) AppendBlob(b []byte) (offset int64, err error) { if offset, err = v.dataFile.Seek(0, 2); err != nil { glog.V(0).Infof("failed to seek the end of file: %v", err) return - } else if offset != int64(v.dataFileSize) { - glog.V(0).Infof("dataFileSize %d != actual data file size: %d", v.dataFileSize, offset) } //ensure file writing starting from aligned positions if offset%NeedlePaddingSize != 0 { @@ -69,12 +67,9 @@ func (v *Volume) AppendBlob(b []byte) (offset int64, err error) { if offset, err = v.dataFile.Seek(offset, 0); err != nil { glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err) return - } else if offset != int64(v.dataFileSize) { - glog.V(0).Infof("dataFileSize %d != actual data file size: %d", v.dataFileSize, offset) } } _, err = v.dataFile.Write(b) - v.dataFileSize += int64(len(b)) return } @@ -91,12 +86,10 @@ func (v *Volume) writeNeedle(n *Needle) (size uint32, err error) { glog.V(4).Infof("needle is unchanged!") return } - var offset, actualSize int64 + var offset int64 if offset, err = v.dataFile.Seek(0, 2); err != nil { glog.V(0).Infof("failed to seek the end of file: %v", err) return - } else if offset != int64(v.dataFileSize) { - glog.V(0).Infof("dataFileSize %d != actual data file size: %d", v.dataFileSize, offset) } //ensure file writing starting from aligned positions @@ -108,13 +101,12 @@ func (v *Volume) writeNeedle(n *Needle) (size uint32, err error) { } } - if size, actualSize, err = n.Append(v.dataFile, v.Version()); err != nil { + if size, _, err = n.Append(v.dataFile, v.Version()); err != nil { if e := v.dataFile.Truncate(offset); e != nil { err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e) } return } - v.dataFileSize += actualSize nv, ok := v.nm.Get(n.Id) if !ok || int64(nv.Offset)*NeedlePaddingSize < offset { @@ -139,18 +131,15 @@ func (v *Volume) deleteNeedle(n *Needle) (uint32, error) { //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size) if ok && nv.Size != TombstoneFileSize { size := nv.Size - // println("adding tombstone", n.Id, "at offset", v.dataFileSize) - if err := v.nm.Delete(n.Id, uint32(v.dataFileSize/NeedlePaddingSize)); err != nil { + offset, err := v.dataFile.Seek(0, 2) + if err != nil { return size, err } - if offset, err := v.dataFile.Seek(0, 2); err != nil { + if err := v.nm.Delete(n.Id, uint32(offset/NeedlePaddingSize)); err != nil { return size, err - } else if offset != int64(v.dataFileSize) { - glog.V(0).Infof("dataFileSize %d != actual data file size: %d, deleteMarker: %d", v.dataFileSize, offset, getActualSize(0)) } n.Data = nil - _, actualSize, err := n.Append(v.dataFile, v.Version()) - v.dataFileSize += actualSize + _, _, err = n.Append(v.dataFile, v.Version()) return size, err } return 0, nil diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go index ae6ee7c25..fc773273d 100644 --- a/weed/storage/volume_super_block.go +++ b/weed/storage/volume_super_block.go @@ -56,7 +56,6 @@ func (v *Volume) maybeWriteSuperBlock() error { } } } - v.dataFileSize = SuperBlockSize } return e } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 13072d1fb..07916fe6b 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -24,7 +24,7 @@ func (v *Volume) Compact() error { v.lastCompactIndexOffset = v.nm.IndexFileSize() v.lastCompactRevision = v.SuperBlock.CompactRevision glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) - return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", v.dataFileSize) + return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", 0) } func (v *Volume) Compact2() error {