diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index b278cf7d1..c606ca811 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -2,6 +2,7 @@ package storage import ( "errors" + "fmt" "log" "os" "path" @@ -10,6 +11,7 @@ import ( const ( SuperBlockSize = 8 + Version = 1 ) type Volume struct { @@ -20,6 +22,8 @@ type Volume struct { replicaType ReplicationType + version uint8 + accessLock sync.Mutex } @@ -28,23 +32,26 @@ func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v v.load() return } -func (v *Volume) load() { +func (v *Volume) load() error { var e error fileName := path.Join(v.dir, v.Id.String()) v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) if e != nil { - log.Fatalf("New Volume [ERROR] %s\n", e) + return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) } if v.replicaType == CopyNil { - v.readSuperBlock() + if e = v.readSuperBlock(); e != nil { + return e + } } else { v.maybeWriteSuperBlock() } indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) if ie != nil { - log.Fatalf("Write Volume Index [ERROR] %s\n", ie) + return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) } v.nm = LoadNeedleMap(indexFile) + return nil } func (v *Volume) Size() int64 { stat, e := v.dataFile.Stat() @@ -61,17 +68,23 @@ func (v *Volume) maybeWriteSuperBlock() { stat, _ := v.dataFile.Stat() if stat.Size() == 0 { header := make([]byte, SuperBlockSize) - header[0] = 1 + header[0] = Version header[1] = v.replicaType.Byte() v.dataFile.Write(header) } } -func (v *Volume) readSuperBlock() { +func (v *Volume) readSuperBlock() error { v.dataFile.Seek(0, 0) header := make([]byte, SuperBlockSize) - if _, error := v.dataFile.Read(header); error == nil { - v.replicaType, _ = NewReplicationTypeFromByte(header[1]) + if _, e := v.dataFile.Read(header); e != nil { + return fmt.Errorf("cannot read superblock: %s", e) } + v.version = header[0] + var err error + if v.replicaType, err = NewReplicationTypeFromByte(header[1]); err != nil { + return fmt.Errorf("cannot read replica type: %s", err) + } + return nil } func (v *Volume) NeedToReplicate() bool { return v.replicaType.GetCopyCount() > 1 @@ -123,9 +136,16 @@ func (v *Volume) commitCompact() (int, error) { v.accessLock.Lock() defer v.accessLock.Unlock() v.dataFile.Close() - os.Rename(path.Join(v.dir, v.Id.String()+".cpd"), path.Join(v.dir, v.Id.String()+".dat")) - os.Rename(path.Join(v.dir, v.Id.String()+".cpx"), path.Join(v.dir, v.Id.String()+".idx")) - v.load() + var e error + if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpd"), path.Join(v.dir, v.Id.String()+".dat")); e != nil { + return 0, e + } + if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpx"), path.Join(v.dir, v.Id.String()+".idx")); e != nil { + return 0, e + } + if e = v.load(); e != nil { + return 0, e + } return 0, nil }