From 0c898df43029bce934562630c2b66ac9d423e41c Mon Sep 17 00:00:00 2001 From: brstgt Date: Fri, 20 Jan 2017 11:52:23 +0100 Subject: [PATCH 1/6] Log volumeId if dataFileSize != actual data file size --- weed/storage/volume_read_write.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 66f18557f..8964f10cf 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -61,7 +61,7 @@ func (v *Volume) AppendBlob(b []byte) (offset int64, err error) { glog.V(0).Infof("failed to seek the end of file: %v", err) return } else if offset != int64(v.dataFileSize) { - glog.V(0).Infof("dataFileSize %d != actual data file size: %d", v.dataFileSize, offset) + glog.V(0).Infof("dataFileSize %d != actual data file size: %d, volumeId: %s", v.dataFileSize, offset, v.Id.String()) } //ensure file writing starting from aligned positions if offset%NeedlePaddingSize != 0 { @@ -70,7 +70,7 @@ func (v *Volume) AppendBlob(b []byte) (offset int64, err error) { glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err) return } else if offset != int64(v.dataFileSize) { - glog.V(0).Infof("dataFileSize %d != actual data file size: %d", v.dataFileSize, offset) + glog.V(0).Infof("dataFileSize %d != actual data file size: %d, volumeId: %s", v.dataFileSize, offset, v.Id.String()) } } _, err = v.dataFile.Write(b) @@ -96,7 +96,7 @@ func (v *Volume) writeNeedle(n *Needle) (size uint32, err error) { glog.V(0).Infof("failed to seek the end of file: %v", err) return } else if offset != int64(v.dataFileSize) { - glog.V(0).Infof("dataFileSize %d != actual data file size: %d", v.dataFileSize, offset) + glog.V(0).Infof("dataFileSize %d != actual data file size: %d, volumeId: %s", v.dataFileSize, offset, v.Id.String()) } //ensure file writing starting from aligned positions @@ -146,7 +146,7 @@ func (v *Volume) deleteNeedle(n *Needle) (uint32, error) { if offset, err := v.dataFile.Seek(0, 2); err != nil { return size, err } else if offset != int64(v.dataFileSize) { - glog.V(0).Infof("dataFileSize %d != actual data file size: %d, deleteMarker: %d", v.dataFileSize, offset, getActualSize(0)) + glog.V(0).Infof("dataFileSize %d != actual data file size: %d, deleteMarker: %d, volumeId: %s", v.dataFileSize, offset, getActualSize(0), v.Id.String()) } n.Data = nil _, actualSize, err := n.Append(v.dataFile, v.Version()) From 18b3afc97a908b96f5891efe145aed0b2fee05bb Mon Sep 17 00:00:00 2001 From: brstgt Date: Fri, 20 Jan 2017 12:22:25 +0100 Subject: [PATCH 2/6] Log volumeId if dataFileSize != actual data file size - improve log types --- weed/storage/volume_read_write.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 8964f10cf..d282c7c22 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -61,7 +61,7 @@ func (v *Volume) AppendBlob(b []byte) (offset int64, err error) { glog.V(0).Infof("failed to seek the end of file: %v", err) return } else if offset != int64(v.dataFileSize) { - glog.V(0).Infof("dataFileSize %d != actual data file size: %d, volumeId: %s", v.dataFileSize, offset, v.Id.String()) + glog.V(0).Infof("dataFileSize %d != actual data file size: %d, volumeId: %v", v.dataFileSize, offset, v.Id) } //ensure file writing starting from aligned positions if offset%NeedlePaddingSize != 0 { @@ -70,7 +70,7 @@ func (v *Volume) AppendBlob(b []byte) (offset int64, err error) { glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err) return } else if offset != int64(v.dataFileSize) { - glog.V(0).Infof("dataFileSize %d != actual data file size: %d, volumeId: %s", v.dataFileSize, offset, v.Id.String()) + glog.V(0).Infof("dataFileSize %d != actual data file size: %d, volumeId: %v", v.dataFileSize, offset, v.Id) } } _, err = v.dataFile.Write(b) @@ -96,7 +96,7 @@ func (v *Volume) writeNeedle(n *Needle) (size uint32, err error) { glog.V(0).Infof("failed to seek the end of file: %v", err) return } else if offset != int64(v.dataFileSize) { - glog.V(0).Infof("dataFileSize %d != actual data file size: %d, volumeId: %s", v.dataFileSize, offset, v.Id.String()) + glog.V(0).Infof("dataFileSize %d != actual data file size: %d, volumeId: %v", v.dataFileSize, offset, v.Id) } //ensure file writing starting from aligned positions @@ -146,7 +146,7 @@ func (v *Volume) deleteNeedle(n *Needle) (uint32, error) { if offset, err := v.dataFile.Seek(0, 2); err != nil { return size, err } else if offset != int64(v.dataFileSize) { - glog.V(0).Infof("dataFileSize %d != actual data file size: %d, deleteMarker: %d, volumeId: %s", v.dataFileSize, offset, getActualSize(0), v.Id.String()) + glog.V(0).Infof("dataFileSize %d != actual data file size: %d, deleteMarker: %d, volumeId: %v", v.dataFileSize, offset, getActualSize(0), v.Id) } n.Data = nil _, actualSize, err := n.Append(v.dataFile, v.Version()) From 492f93416d98b9d6342f10450af32185892605cf Mon Sep 17 00:00:00 2001 From: brstgt Date: Fri, 20 Jan 2017 12:49:20 +0100 Subject: [PATCH 3/6] Mount and unmount volumes online without restarting volume server --- weed/server/volume_server.go | 2 + weed/server/volume_server_handlers_sync.go | 37 ++++++++++++++++-- weed/storage/disk_location.go | 44 +++++++++++++++++++-- weed/storage/store.go | 45 ++++++++++++++++++---- 4 files changed, 114 insertions(+), 14 deletions(-) diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index e86c33bda..bb2791622 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -58,6 +58,8 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, adminMux.HandleFunc("/admin/sync/status", vs.guard.WhiteList(vs.getVolumeSyncStatusHandler)) adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler)) adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler)) + adminMux.HandleFunc("/admin/volume/mount", vs.guard.WhiteList(vs.getVolumeMountHandler)) + adminMux.HandleFunc("/admin/volume/unmount", vs.guard.WhiteList(vs.getVolumeUnmountHandler)) adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) diff --git a/weed/server/volume_server_handlers_sync.go b/weed/server/volume_server_handlers_sync.go index 8a2e30743..438f17b25 100644 --- a/weed/server/volume_server_handlers_sync.go +++ b/weed/server/volume_server_handlers_sync.go @@ -68,20 +68,51 @@ func (vs *VolumeServer) getVolumeDataContentHandler(w http.ResponseWriter, r *ht w.Write(content) } -func (vs *VolumeServer) getVolume(volumeParameterName string, r *http.Request) (*storage.Volume, error) { +func (vs *VolumeServer) getVolumeId(volumeParameterName string, r *http.Request) (storage.VolumeId, error) { volumeIdString := r.FormValue(volumeParameterName) + if volumeIdString == "" { err := fmt.Errorf("Empty Volume Id: Need to pass in %s=the_volume_id.", volumeParameterName) - return nil, err + return 0, err } + vid, err := storage.NewVolumeId(volumeIdString) if err != nil { err = fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) + return 0, err + } + + return vid, err +} + +func (vs *VolumeServer) getVolume(volumeParameterName string, r *http.Request) (*storage.Volume, error) { + vid, err := vs.getVolumeId(volumeParameterName, r) + if err != nil { return nil, err } v := vs.store.GetVolume(vid) if v == nil { - return nil, fmt.Errorf("Not Found Volume Id %s: %d", volumeIdString, vid) + return nil, fmt.Errorf("Not Found Volume Id %d", vid) } return v, nil } + +func (vs *VolumeServer) getVolumeMountHandler(w http.ResponseWriter, r *http.Request) { + vid, err := vs.getVolumeId("volume", r) + if err != nil { + writeJsonError(w, r, http.StatusNotFound, err) + return + } + vs.store.MountVolume(vid) + writeJsonQuiet(w, r, http.StatusOK, "Volume mounted") +} + +func (vs *VolumeServer) getVolumeUnmountHandler(w http.ResponseWriter, r *http.Request) { + vid, err := vs.getVolumeId("volume", r) + if err != nil { + writeJsonError(w, r, http.StatusNotFound, err) + return + } + vs.store.UnmountVolume(vid) + writeJsonQuiet(w, r, http.StatusOK, "Volume unmounted") +} diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 496e0dd57..8f5527d30 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/chrislusf/seaweedfs/weed/glog" + "fmt" ) type DiskLocation struct { @@ -22,16 +23,27 @@ func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation { return location } -func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleMapType, mutex *sync.RWMutex) { +func (l *DiskLocation) volumeIdFromPath(dir os.FileInfo) (VolumeId, string, error) { name := dir.Name() if !dir.IsDir() && strings.HasSuffix(name, ".dat") { collection := "" - base := name[:len(name)-len(".dat")] + base := name[:len(name) - len(".dat")] i := strings.LastIndex(base, "_") if i > 0 { - collection, base = base[0:i], base[i+1:] + collection, base = base[0:i], base[i + 1:] } - if vid, err := NewVolumeId(base); err == nil { + vol, err := NewVolumeId(base); + return vol, collection, err + } + + return 0, "", fmt.Errorf("Path is not a volume: %s", name) +} + +func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleMapType, mutex *sync.RWMutex) { + name := dir.Name() + if !dir.IsDir() && strings.HasSuffix(name, ".dat") { + vid, collection, err := l.volumeIdFromPath(dir) + if err == nil { mutex.RLock() _, found := l.volumes[vid] mutex.RUnlock() @@ -125,6 +137,30 @@ func (l *DiskLocation) deleteVolumeById(vid VolumeId) (e error) { return } +func (l *DiskLocation) LoadVolume(vid VolumeId, needleMapKind NeedleMapType) bool { + if dirs, err := ioutil.ReadDir(l.Directory); err == nil { + for _, dir := range dirs { + volId, _, err := l.volumeIdFromPath(dir) + if vid == volId && err == nil { + var mutex sync.RWMutex + l.loadExistingVolume(dir, needleMapKind, &mutex) + return true + } + } + } + + return false +} + +func (l *DiskLocation) UnloadVolume(vid VolumeId) (e error) { + _, ok := l.volumes[vid] + if !ok { + return fmt.Errorf("Volume not loaded, VolumeId: %d", vid) + } + delete(l.volumes, vid) + return nil +} + func (l *DiskLocation) SetVolume(vid VolumeId, volume *Volume) { l.Lock() defer l.Unlock() diff --git a/weed/storage/store.go b/weed/storage/store.go index f70ebe0ee..045b48220 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -77,6 +77,7 @@ type Store struct { connected bool VolumeSizeLimit uint64 //read from the master Client pb.Seaweed_SendHeartbeatClient + NeedleMapType NeedleMapType } func (s *Store) String() (str string) { @@ -85,7 +86,7 @@ func (s *Store) String() (str string) { } func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) { - s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl} + s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind} s.Locations = make([]*DiskLocation, 0) for i := 0; i < len(dirnames); i++ { location := NewDiskLocation(dirnames[i], maxVolumeCounts[i]) @@ -261,6 +262,7 @@ func (s *Store) Close() { location.Close() } } + func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { if v := s.findVolume(i); v != nil { if v.readOnly { @@ -275,11 +277,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { } if s.VolumeSizeLimit < v.ContentSize()+3*uint64(size) { glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.VolumeSizeLimit) - if s.Client != nil { - if e := s.Client.Send(s.CollectHeartbeat()); e != nil { - glog.V(0).Infoln("error when reporting size:", e) - } - } + s.updateMaster() } return } @@ -287,17 +285,27 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { err = fmt.Errorf("Volume %d not found!", i) return } + +func (s *Store) updateMaster() { + if s.Client != nil { + if e := s.Client.Send(s.CollectHeartbeat()); e != nil { + glog.V(0).Infoln("error when reporting size:", e) + } + } +} + func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) { if v := s.findVolume(i); v != nil && !v.readOnly { return v.deleteNeedle(n) } return 0, nil } + func (s *Store) ReadVolumeNeedle(i VolumeId, n *Needle) (int, error) { if v := s.findVolume(i); v != nil { return v.readNeedle(n) } - return 0, fmt.Errorf("Volume %v not found!", i) + return 0, fmt.Errorf("Volume %d not found!", i) } func (s *Store) GetVolume(i VolumeId) *Volume { return s.findVolume(i) @@ -307,3 +315,26 @@ func (s *Store) HasVolume(i VolumeId) bool { v := s.findVolume(i) return v != nil } + +func (s *Store) MountVolume(i VolumeId) error { + for _, location := range s.Locations { + if found := location.LoadVolume(i, s.NeedleMapType); found == true { + s.updateMaster() + return nil + } + } + + return fmt.Errorf("Volume %d not found on disk", i) +} + +func (s *Store) UnmountVolume(i VolumeId) error { + for _, location := range s.Locations { + if err := location.UnloadVolume(i); err == nil { + s.updateMaster() + return nil + } + } + + return fmt.Errorf("Volume %d not found on disk", i) +} + From e074a54a2080a13af7526c66ac60ad8a7ee9859a Mon Sep 17 00:00:00 2001 From: brstgt Date: Fri, 20 Jan 2017 13:02:37 +0100 Subject: [PATCH 4/6] Delete volumes online without restarting volume server --- weed/server/volume_server.go | 1 + weed/server/volume_server_handlers_admin.go | 44 +++++++++++++++++++++ weed/server/volume_server_handlers_sync.go | 34 +--------------- weed/storage/disk_location.go | 10 ++++- weed/storage/store.go | 10 +++++ 5 files changed, 65 insertions(+), 34 deletions(-) diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index bb2791622..cc06f0092 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -60,6 +60,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler)) adminMux.HandleFunc("/admin/volume/mount", vs.guard.WhiteList(vs.getVolumeMountHandler)) adminMux.HandleFunc("/admin/volume/unmount", vs.guard.WhiteList(vs.getVolumeUnmountHandler)) + adminMux.HandleFunc("/admin/volume/delete", vs.guard.WhiteList(vs.getVolumeDeleteHandler)) adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go index 28631dac7..79bb89756 100644 --- a/weed/server/volume_server_handlers_admin.go +++ b/weed/server/volume_server_handlers_admin.go @@ -1,6 +1,7 @@ package weed_server import ( + "fmt" "net/http" "path/filepath" "strconv" @@ -8,6 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/storage" ) func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { @@ -65,3 +67,45 @@ func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) m["DiskStatuses"] = ds writeJsonQuiet(w, r, http.StatusOK, m) } + +func (vs *VolumeServer) getVolume(volumeParameterName string, r *http.Request) (*storage.Volume, error) { + vid, err := vs.getVolumeId(volumeParameterName, r) + if err != nil { + return nil, err + } + v := vs.store.GetVolume(vid) + if v == nil { + return nil, fmt.Errorf("Not Found Volume Id %d", vid) + } + return v, nil +} + +func (vs *VolumeServer) getVolumeMountHandler(w http.ResponseWriter, r *http.Request) { + vid, err := vs.getVolumeId("volume", r) + if err != nil { + writeJsonError(w, r, http.StatusNotFound, err) + return + } + vs.store.MountVolume(vid) + writeJsonQuiet(w, r, http.StatusOK, "Volume mounted") +} + +func (vs *VolumeServer) getVolumeUnmountHandler(w http.ResponseWriter, r *http.Request) { + vid, err := vs.getVolumeId("volume", r) + if err != nil { + writeJsonError(w, r, http.StatusNotFound, err) + return + } + vs.store.UnmountVolume(vid) + writeJsonQuiet(w, r, http.StatusOK, "Volume unmounted") +} + +func (vs *VolumeServer) getVolumeDeleteHandler(w http.ResponseWriter, r *http.Request) { + vid, err := vs.getVolumeId("volume", r) + if err != nil { + writeJsonError(w, r, http.StatusNotFound, err) + return + } + vs.store.DeleteVolume(vid) + writeJsonQuiet(w, r, http.StatusOK, "Volume deleted") +} diff --git a/weed/server/volume_server_handlers_sync.go b/weed/server/volume_server_handlers_sync.go index 438f17b25..68c381e28 100644 --- a/weed/server/volume_server_handlers_sync.go +++ b/weed/server/volume_server_handlers_sync.go @@ -83,36 +83,4 @@ func (vs *VolumeServer) getVolumeId(volumeParameterName string, r *http.Request) } return vid, err -} - -func (vs *VolumeServer) getVolume(volumeParameterName string, r *http.Request) (*storage.Volume, error) { - vid, err := vs.getVolumeId(volumeParameterName, r) - if err != nil { - return nil, err - } - v := vs.store.GetVolume(vid) - if v == nil { - return nil, fmt.Errorf("Not Found Volume Id %d", vid) - } - return v, nil -} - -func (vs *VolumeServer) getVolumeMountHandler(w http.ResponseWriter, r *http.Request) { - vid, err := vs.getVolumeId("volume", r) - if err != nil { - writeJsonError(w, r, http.StatusNotFound, err) - return - } - vs.store.MountVolume(vid) - writeJsonQuiet(w, r, http.StatusOK, "Volume mounted") -} - -func (vs *VolumeServer) getVolumeUnmountHandler(w http.ResponseWriter, r *http.Request) { - vid, err := vs.getVolumeId("volume", r) - if err != nil { - writeJsonError(w, r, http.StatusNotFound, err) - return - } - vs.store.UnmountVolume(vid) - writeJsonQuiet(w, r, http.StatusOK, "Volume unmounted") -} +} \ No newline at end of file diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 8f5527d30..a42f67ecd 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -152,7 +152,15 @@ func (l *DiskLocation) LoadVolume(vid VolumeId, needleMapKind NeedleMapType) boo return false } -func (l *DiskLocation) UnloadVolume(vid VolumeId) (e error) { +func (l *DiskLocation) DeleteVolume(vid VolumeId) (error) { + _, ok := l.volumes[vid] + if !ok { + return fmt.Errorf("Volume not found, VolumeId: %d", vid) + } + return l.deleteVolumeById(vid) +} + +func (l *DiskLocation) UnloadVolume(vid VolumeId) (error) { _, ok := l.volumes[vid] if !ok { return fmt.Errorf("Volume not loaded, VolumeId: %d", vid) diff --git a/weed/storage/store.go b/weed/storage/store.go index 045b48220..58c8de0d7 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -338,3 +338,13 @@ func (s *Store) UnmountVolume(i VolumeId) error { return fmt.Errorf("Volume %d not found on disk", i) } +func (s *Store) DeleteVolume(i VolumeId) error { + for _, location := range s.Locations { + if error := location.deleteVolumeById(i); error == nil { + s.updateMaster() + return nil + } + } + + return fmt.Errorf("Volume %d not found on disk", i) +} From 4fb5bb09b664a9c70ee2d2c0f8aa59538dd662a4 Mon Sep 17 00:00:00 2001 From: brstgt Date: Fri, 20 Jan 2017 16:31:11 +0100 Subject: [PATCH 5/6] Remove obsolete property Volume.dataFileSize --- weed/storage/disk_location.go | 4 ---- weed/storage/volume.go | 1 - weed/storage/volume_loading.go | 2 +- weed/storage/volume_read_write.go | 23 ++++++----------------- weed/storage/volume_super_block.go | 1 - weed/storage/volume_vacuum.go | 2 +- 6 files changed, 8 insertions(+), 25 deletions(-) diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index a42f67ecd..9b9468c5b 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -54,10 +54,6 @@ func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleM mutex.Unlock() glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String()) - if v.Size() != v.dataFileSize { - glog.V(0).Infof("data file %s, size=%d expected=%d", - l.Directory+"/"+name, v.Size(), v.dataFileSize) - } } else { glog.V(0).Infof("new volume %s error %s", name, e) } diff --git a/weed/storage/volume.go b/weed/storage/volume.go index df9f0b7a7..f168ad155 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -15,7 +15,6 @@ type Volume struct { dir string Collection string dataFile *os.File - dataFileSize int64 nm NeedleMapper needleMapKind NeedleMapType readOnly bool diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index c4f1aae9b..5043c1754 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -64,7 +64,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e) } } - if v.dataFileSize, e = CheckVolumeDataIntegrity(v, indexFile); e != nil { + if _, e = CheckVolumeDataIntegrity(v, indexFile); e != nil { v.readOnly = true glog.V(0).Infof("volumeDataIntegrityChecking failed %v", e) } diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index d282c7c22..2314bc815 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -60,8 +60,6 @@ func (v *Volume) AppendBlob(b []byte) (offset int64, err error) { if offset, err = v.dataFile.Seek(0, 2); err != nil { glog.V(0).Infof("failed to seek the end of file: %v", err) return - } else if offset != int64(v.dataFileSize) { - glog.V(0).Infof("dataFileSize %d != actual data file size: %d, volumeId: %v", v.dataFileSize, offset, v.Id) } //ensure file writing starting from aligned positions if offset%NeedlePaddingSize != 0 { @@ -69,12 +67,9 @@ func (v *Volume) AppendBlob(b []byte) (offset int64, err error) { if offset, err = v.dataFile.Seek(offset, 0); err != nil { glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err) return - } else if offset != int64(v.dataFileSize) { - glog.V(0).Infof("dataFileSize %d != actual data file size: %d, volumeId: %v", v.dataFileSize, offset, v.Id) } } _, err = v.dataFile.Write(b) - v.dataFileSize += int64(len(b)) return } @@ -91,12 +86,10 @@ func (v *Volume) writeNeedle(n *Needle) (size uint32, err error) { glog.V(4).Infof("needle is unchanged!") return } - var offset, actualSize int64 + var offset int64 if offset, err = v.dataFile.Seek(0, 2); err != nil { glog.V(0).Infof("failed to seek the end of file: %v", err) return - } else if offset != int64(v.dataFileSize) { - glog.V(0).Infof("dataFileSize %d != actual data file size: %d, volumeId: %v", v.dataFileSize, offset, v.Id) } //ensure file writing starting from aligned positions @@ -108,13 +101,12 @@ func (v *Volume) writeNeedle(n *Needle) (size uint32, err error) { } } - if size, actualSize, err = n.Append(v.dataFile, v.Version()); err != nil { + if size, _, err = n.Append(v.dataFile, v.Version()); err != nil { if e := v.dataFile.Truncate(offset); e != nil { err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e) } return } - v.dataFileSize += actualSize nv, ok := v.nm.Get(n.Id) if !ok || int64(nv.Offset)*NeedlePaddingSize < offset { @@ -139,18 +131,15 @@ func (v *Volume) deleteNeedle(n *Needle) (uint32, error) { //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size) if ok && nv.Size != TombstoneFileSize { size := nv.Size - // println("adding tombstone", n.Id, "at offset", v.dataFileSize) - if err := v.nm.Delete(n.Id, uint32(v.dataFileSize/NeedlePaddingSize)); err != nil { + offset, err := v.dataFile.Seek(0, 2) + if err != nil { return size, err } - if offset, err := v.dataFile.Seek(0, 2); err != nil { + if err := v.nm.Delete(n.Id, uint32(offset/NeedlePaddingSize)); err != nil { return size, err - } else if offset != int64(v.dataFileSize) { - glog.V(0).Infof("dataFileSize %d != actual data file size: %d, deleteMarker: %d, volumeId: %v", v.dataFileSize, offset, getActualSize(0), v.Id) } n.Data = nil - _, actualSize, err := n.Append(v.dataFile, v.Version()) - v.dataFileSize += actualSize + _, _, err = n.Append(v.dataFile, v.Version()) return size, err } return 0, nil diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go index ae6ee7c25..fc773273d 100644 --- a/weed/storage/volume_super_block.go +++ b/weed/storage/volume_super_block.go @@ -56,7 +56,6 @@ func (v *Volume) maybeWriteSuperBlock() error { } } } - v.dataFileSize = SuperBlockSize } return e } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 13072d1fb..07916fe6b 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -24,7 +24,7 @@ func (v *Volume) Compact() error { v.lastCompactIndexOffset = v.nm.IndexFileSize() v.lastCompactRevision = v.SuperBlock.CompactRevision glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) - return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", v.dataFileSize) + return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", 0) } func (v *Volume) Compact2() error { From 0656838fe57651f59fc511ce02206ad5736d4ca9 Mon Sep 17 00:00:00 2001 From: brstgt Date: Fri, 20 Jan 2017 16:37:45 +0100 Subject: [PATCH 6/6] Don't return actual file size from CheckVolumeDataIntegrity, it will be 0 if last needle is a tombstone, so it's not reliable anyway --- weed/storage/volume_checking.go | 14 +++++++------- weed/storage/volume_loading.go | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index 6d4011f27..67538ebb2 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -12,28 +12,28 @@ func getActualSize(size uint32) int64 { return NeedleHeaderSize + int64(size) + NeedleChecksumSize + int64(padding) } -func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (int64, error) { +func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (error) { var indexSize int64 var e error if indexSize, e = verifyIndexFileIntegrity(indexFile); e != nil { - return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e) + return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e) } if indexSize == 0 { - return int64(SuperBlockSize), nil + return nil } var lastIdxEntry []byte if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleIndexSize); e != nil { - return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e) + return fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e) } key, offset, size := idxFileEntry(lastIdxEntry) if offset == 0 || size == TombstoneFileSize { - return 0, nil + return nil } if e = verifyNeedleIntegrity(v.dataFile, v.Version(), int64(offset)*NeedlePaddingSize, key, size); e != nil { - return 0, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e) + return fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e) } - return int64(offset)*int64(NeedlePaddingSize) + getActualSize(size), nil + return nil } func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) { diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index 5043c1754..4be860987 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -64,7 +64,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e) } } - if _, e = CheckVolumeDataIntegrity(v, indexFile); e != nil { + if e = CheckVolumeDataIntegrity(v, indexFile); e != nil { v.readOnly = true glog.V(0).Infof("volumeDataIntegrityChecking failed %v", e) }