From 44c4e74655a44e28c0485b883c27ee42b78df5d6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 12 Aug 2013 16:53:32 -0700 Subject: [PATCH] correct and more cleaner logic to fall back to read only mode checking file permissions directly since the try and catch exception approach does not work consistently as seen in bug #41 --- go/storage/volume.go | 107 +++++++++++++++++++++++++++---------------- 1 file changed, 67 insertions(+), 40 deletions(-) diff --git a/go/storage/volume.go b/go/storage/volume.go index 0f781d8d4..b2c8a2337 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -9,6 +9,7 @@ import ( "os" "path" "sync" + "time" ) const ( @@ -54,61 +55,42 @@ func loadVolumeWithoutIndex(dirname string, id VolumeId) (v *Volume, e error) { func (v *Volume) load(alsoLoadIndex bool) error { var e error fileName := path.Join(v.dir, v.Id.String()) - v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) + if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists && !canRead { + return fmt.Errorf("cannot read Volume Data file %s.dat", fileName) + } else if canWrite { + v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) + } else { + glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode") + v.dataFile, e = os.Open(fileName + ".dat") + v.readOnly = true + } if e != nil { if !os.IsPermission(e) { - return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e.Error()) - } - if v.dataFile, e = os.Open(fileName + ".dat"); e != nil { - return fmt.Errorf("cannot open Volume Data %s.dat: %s", fileName, e.Error()) + return fmt.Errorf("cannot load Volume Data %s.dat: %s", fileName, e.Error()) } - glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode") - v.readOnly = true } + if v.ReplicaType == CopyNil { e = v.readSuperBlock() } else { e = v.maybeWriteSuperBlock() } if e == nil && alsoLoadIndex { - var indexFile *os.File if v.readOnly { - glog.V(2).Infoln("opening file", fileName+".idx") - if indexFile, e = os.Open(fileName + ".idx"); e != nil && !os.IsNotExist(e) { - return fmt.Errorf("cannot open index file %s.idx: %s", fileName, e.Error()) - } - if indexFile != nil { - glog.V(2).Infoln("check file", fileName+".cdb") - if _, err := os.Stat(fileName + ".cdb"); os.IsNotExist(err) { - glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName) - if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil { - glog.V(0).Infof("error converting %s.idx to %s.cdb: %s", fileName, e.Error()) - } else { - indexFile.Close() - indexFile = nil - } - } - } - glog.V(2).Infoln("open file", fileName+".cdb") - if v.nm, e = OpenCdbMap(fileName + ".cdb"); e != nil { - if os.IsNotExist(e) { - glog.V(0).Infof("Failed to read cdb file %s, fall back to normal readonly mode.", fileName) - } else { - glog.V(0).Infof("%s.cdb open errro:%s", fileName, e.Error()) - return e - } + if v.ensureConvertIdxToCdb(fileName) { + v.nm, e = OpenCdbMap(fileName + ".cdb") + return e } } + var indexFile *os.File if v.readOnly { glog.V(1).Infoln("open to read file", fileName+".idx") - indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644) - if e != nil { + if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil { return fmt.Errorf("cannot read Volume Data %s.dat: %s", fileName, e.Error()) } } else { glog.V(1).Infoln("open to write file", fileName+".idx") - indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) - if e != nil { + if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil { return fmt.Errorf("cannot write Volume Data %s.dat: %s", fileName, e.Error()) } } @@ -160,11 +142,11 @@ func (v *Volume) maybeWriteSuperBlock() error { } func (v *Volume) readSuperBlock() (err error) { if _, err = v.dataFile.Seek(0, 0); err != nil { - return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile, err) + return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile, err.Error()) } header := make([]byte, SuperBlockSize) if _, e := v.dataFile.Read(header); e != nil { - return fmt.Errorf("cannot read superblock: %s", e) + return fmt.Errorf("cannot read superblock: %s", e.Error()) } v.SuperBlock, err = ParseSuperBlock(header) return err @@ -172,7 +154,7 @@ func (v *Volume) readSuperBlock() (err error) { func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) { superBlock.Version = Version(header[0]) if superBlock.ReplicaType, err = NewReplicationTypeFromByte(header[1]); err != nil { - err = fmt.Errorf("cannot read replica type: %s", err) + err = fmt.Errorf("cannot read replica type: %s", err.Error()) } return } @@ -221,7 +203,7 @@ func (v *Volume) write(n *Needle) (size uint32, err error) { if size, err = n.Append(v.dataFile, v.Version()); err != nil { if e := v.dataFile.Truncate(offset); e != nil { - err = fmt.Errorf("%s\ncannot truncate %s: %s", err, v.dataFile, e) + err = fmt.Errorf("%s\ncannot truncate %s: %s", err, v.dataFile, e.Error()) } return } @@ -403,3 +385,48 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro func (v *Volume) ContentSize() uint64 { return v.nm.ContentSize() } + +func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) { + exists = true + fi, err := os.Stat(filename) + if os.IsNotExist(err) { + exists = false + return + } + if fi.Mode()&0400 != 0 { + canRead = true + } + if fi.Mode()&0200 != 0 { + canWrite = true + } + modTime = fi.ModTime() + return +} +func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) { + var indexFile *os.File + var e error + _, cdbCanRead, cdbCanWrite, cdbModTime := checkFile(fileName + ".cdb") + _, idxCanRead, _, idxModeTime := checkFile(fileName + ".idx") + if cdbCanRead && cdbModTime.After(idxModeTime) { + return true + } + if !cdbCanWrite { + return false + } + if !idxCanRead { + glog.V(0).Infoln("Can not read file", fileName+".idx!") + return false + } + glog.V(2).Infoln("opening file", fileName+".idx") + if indexFile, e = os.Open(fileName + ".idx"); e != nil { + glog.V(0).Infoln("Failed to read file", fileName+".idx !") + return false + } + defer indexFile.Close() + glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName) + if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil { + glog.V(0).Infof("error converting %s.idx to %s.cdb: %s", fileName, fileName, e.Error()) + return false + } + return true +}