diff --git a/go/storage/volume.go b/go/storage/volume.go index 0f781d8d4..b2c8a2337 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -9,6 +9,7 @@ import ( "os" "path" "sync" + "time" ) const ( @@ -54,61 +55,42 @@ func loadVolumeWithoutIndex(dirname string, id VolumeId) (v *Volume, e error) { func (v *Volume) load(alsoLoadIndex bool) error { var e error fileName := path.Join(v.dir, v.Id.String()) - v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) + if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists && !canRead { + return fmt.Errorf("cannot read Volume Data file %s.dat", fileName) + } else if canWrite { + v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) + } else { + glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode") + v.dataFile, e = os.Open(fileName + ".dat") + v.readOnly = true + } if e != nil { if !os.IsPermission(e) { - return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e.Error()) - } - if v.dataFile, e = os.Open(fileName + ".dat"); e != nil { - return fmt.Errorf("cannot open Volume Data %s.dat: %s", fileName, e.Error()) + return fmt.Errorf("cannot load Volume Data %s.dat: %s", fileName, e.Error()) } - glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode") - v.readOnly = true } + if v.ReplicaType == CopyNil { e = v.readSuperBlock() } else { e = v.maybeWriteSuperBlock() } if e == nil && alsoLoadIndex { - var indexFile *os.File if v.readOnly { - glog.V(2).Infoln("opening file", fileName+".idx") - if indexFile, e = os.Open(fileName + ".idx"); e != nil && !os.IsNotExist(e) { - return fmt.Errorf("cannot open index file %s.idx: %s", fileName, e.Error()) - } - if indexFile != nil { - glog.V(2).Infoln("check file", fileName+".cdb") - if _, err := os.Stat(fileName + ".cdb"); os.IsNotExist(err) { - glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName) - if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil { - glog.V(0).Infof("error converting %s.idx to %s.cdb: %s", fileName, e.Error()) - } else { - indexFile.Close() - indexFile = nil - } - } - } - glog.V(2).Infoln("open file", fileName+".cdb") - if v.nm, e = OpenCdbMap(fileName + ".cdb"); e != nil { - if os.IsNotExist(e) { - glog.V(0).Infof("Failed to read cdb file %s, fall back to normal readonly mode.", fileName) - } else { - glog.V(0).Infof("%s.cdb open errro:%s", fileName, e.Error()) - return e - } + if v.ensureConvertIdxToCdb(fileName) { + v.nm, e = OpenCdbMap(fileName + ".cdb") + return e } } + var indexFile *os.File if v.readOnly { glog.V(1).Infoln("open to read file", fileName+".idx") - indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644) - if e != nil { + if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil { return fmt.Errorf("cannot read Volume Data %s.dat: %s", fileName, e.Error()) } } else { glog.V(1).Infoln("open to write file", fileName+".idx") - indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) - if e != nil { + if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil { return fmt.Errorf("cannot write Volume Data %s.dat: %s", fileName, e.Error()) } } @@ -160,11 +142,11 @@ func (v *Volume) maybeWriteSuperBlock() error { } func (v *Volume) readSuperBlock() (err error) { if _, err = v.dataFile.Seek(0, 0); err != nil { - return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile, err) + return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile, err.Error()) } header := make([]byte, SuperBlockSize) if _, e := v.dataFile.Read(header); e != nil { - return fmt.Errorf("cannot read superblock: %s", e) + return fmt.Errorf("cannot read superblock: %s", e.Error()) } v.SuperBlock, err = ParseSuperBlock(header) return err @@ -172,7 +154,7 @@ func (v *Volume) readSuperBlock() (err error) { func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) { superBlock.Version = Version(header[0]) if superBlock.ReplicaType, err = NewReplicationTypeFromByte(header[1]); err != nil { - err = fmt.Errorf("cannot read replica type: %s", err) + err = fmt.Errorf("cannot read replica type: %s", err.Error()) } return } @@ -221,7 +203,7 @@ func (v *Volume) write(n *Needle) (size uint32, err error) { if size, err = n.Append(v.dataFile, v.Version()); err != nil { if e := v.dataFile.Truncate(offset); e != nil { - err = fmt.Errorf("%s\ncannot truncate %s: %s", err, v.dataFile, e) + err = fmt.Errorf("%s\ncannot truncate %s: %s", err, v.dataFile, e.Error()) } return } @@ -403,3 +385,48 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro func (v *Volume) ContentSize() uint64 { return v.nm.ContentSize() } + +func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time) { + exists = true + fi, err := os.Stat(filename) + if os.IsNotExist(err) { + exists = false + return + } + if fi.Mode()&0400 != 0 { + canRead = true + } + if fi.Mode()&0200 != 0 { + canWrite = true + } + modTime = fi.ModTime() + return +} +func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) { + var indexFile *os.File + var e error + _, cdbCanRead, cdbCanWrite, cdbModTime := checkFile(fileName + ".cdb") + _, idxCanRead, _, idxModeTime := checkFile(fileName + ".idx") + if cdbCanRead && cdbModTime.After(idxModeTime) { + return true + } + if !cdbCanWrite { + return false + } + if !idxCanRead { + glog.V(0).Infoln("Can not read file", fileName+".idx!") + return false + } + glog.V(2).Infoln("opening file", fileName+".idx") + if indexFile, e = os.Open(fileName + ".idx"); e != nil { + glog.V(0).Infoln("Failed to read file", fileName+".idx !") + return false + } + defer indexFile.Close() + glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName) + if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil { + glog.V(0).Infof("error converting %s.idx to %s.cdb: %s", fileName, fileName, e.Error()) + return false + } + return true +}