diff --git a/weed-fs/src/cmd/weed/fix.go b/weed-fs/src/cmd/weed/fix.go index ef717a6b3..7a186f44f 100644 --- a/weed-fs/src/cmd/weed/fix.go +++ b/weed-fs/src/cmd/weed/fix.go @@ -1,65 +1,70 @@ package main import ( - "pkg/storage" - "log" - "os" - "path" - "strconv" + "log" + "os" + "path" + "pkg/storage" + "strconv" ) func init() { - cmdFix.Run = runFix // break init cycle - IsDebug = cmdFix.Flag.Bool("debug", false, "enable debug mode") + cmdFix.Run = runFix // break init cycle + IsDebug = cmdFix.Flag.Bool("debug", false, "enable debug mode") } var cmdFix = &Command{ - UsageLine: "fix -dir=/tmp -volumeId=234 -debug=1", - Short: "run weed tool fix on index file if corrupted", - Long: `Fix runs the WeedFS fix command to re-create the index .idx file. + UsageLine: "fix -dir=/tmp -volumeId=234 -debug=1", + Short: "run weed tool fix on index file if corrupted", + Long: `Fix runs the WeedFS fix command to re-create the index .idx file. `, } var ( - dir = cmdFix.Flag.String("dir", "/tmp", "data directory to store files") - volumeId = cmdFix.Flag.Int("volumeId", -1, "a non-negative volume id. The volume should already exist in the dir. The volume index file should not exist.") + dir = cmdFix.Flag.String("dir", "/tmp", "data directory to store files") + volumeId = cmdFix.Flag.Int("volumeId", -1, "a non-negative volume id. The volume should already exist in the dir. The volume index file should not exist.") ) func runFix(cmd *Command, args []string) bool { - if *volumeId == -1 { - return false - } + if *volumeId == -1 { + return false + } - fileName := strconv.Itoa(*volumeId) - dataFile, e := os.OpenFile(path.Join(*dir, fileName+".dat"), os.O_RDONLY, 0644) - if e != nil { - log.Fatalf("Read Volume [ERROR] %s\n", e) - } - defer dataFile.Close() - indexFile, ie := os.OpenFile(path.Join(*dir, fileName+".idx"), os.O_WRONLY|os.O_CREATE, 0644) - if ie != nil { - log.Fatalf("Create Volume Index [ERROR] %s\n", ie) - } - defer indexFile.Close() + fileName := strconv.Itoa(*volumeId) + dataFile, e := os.OpenFile(path.Join(*dir, fileName+".dat"), os.O_RDONLY, 0644) + if e != nil { + log.Fatalf("Read Volume [ERROR] %s\n", e) + } + defer dataFile.Close() + indexFile, ie := os.OpenFile(path.Join(*dir, fileName+".idx"), os.O_WRONLY|os.O_CREATE, 0644) + if ie != nil { + log.Fatalf("Create Volume Index [ERROR] %s\n", ie) + } + defer indexFile.Close() - //skip the volume super block - dataFile.Seek(storage.SuperBlockSize, 0) + dataFile.Seek(0, 0) + header := make([]byte, storage.SuperBlockSize) + if _, e := dataFile.Read(header); e != nil { + log.Fatalf("cannot read superblock: %s", e) + } - n, rest := storage.ReadNeedle(dataFile) - dataFile.Seek(int64(rest), 1) - nm := storage.NewNeedleMap(indexFile) - offset := uint32(storage.SuperBlockSize) - for n != nil { - debug("key", n.Id, "volume offset", offset, "data_size", n.Size, "rest", rest) - if n.Size > 0 { - count, pe := nm.Put(n.Id, offset/8, n.Size) - debug("saved", count, "with error", pe) - } - offset += rest+16 - n, rest = storage.ReadNeedle(dataFile) - dataFile.Seek(int64(rest), 1) - } - return true + ver, _, _ := storage.ParseSuperBlock(header) + + n, rest := storage.ReadNeedle(dataFile, ver) + dataFile.Seek(int64(rest), 1) + nm := storage.NewNeedleMap(indexFile) + offset := uint32(storage.SuperBlockSize) + for n != nil { + debug("key", n.Id, "volume offset", offset, "data_size", n.Size, "rest", rest) + if n.Size > 0 { + count, pe := nm.Put(n.Id, offset/8, n.Size) + debug("saved", count, "with error", pe) + } + offset += rest + 16 + n, rest = storage.ReadNeedle(dataFile, ver) + dataFile.Seek(int64(rest), 1) + } + return true } diff --git a/weed-fs/src/pkg/storage/needle_read_write.go b/weed-fs/src/pkg/storage/needle_read_write.go index bd0bebd0b..482323701 100644 --- a/weed-fs/src/pkg/storage/needle_read_write.go +++ b/weed-fs/src/pkg/storage/needle_read_write.go @@ -1,7 +1,7 @@ package storage import ( - "errors" + "errors" "io" "os" "pkg/util" @@ -15,14 +15,14 @@ func (n *Needle) Append(w io.Writer) uint32 { util.Uint32toBytes(header[12:16], n.Size) w.Write(header) w.Write(n.Data) - rest := 8 - ((n.Size + 16 + 4) % 8) + rest := 8 - ((16 + n.Size + 4) % 8) util.Uint32toBytes(header[0:4], n.Checksum.Value()) - w.Write(header[0 : rest+4]) + w.Write(header[0 : 4+rest]) return n.Size } func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) { if version == Version1 { - bytes := make([]byte, size+16+4) + bytes := make([]byte, 16+size+4) ret, e := r.Read(bytes) n.Cookie = util.BytesToUint32(bytes[0:4]) n.Id = util.BytesToUint64(bytes[4:12]) @@ -33,20 +33,24 @@ func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) { return 0, errors.New("CRC error! Data On Disk Corrupted!") } return ret, e - }else if version == Version2 { + } else if version == Version2 { } return 0, errors.New("Unsupported Version!") } -func ReadNeedle(r *os.File) (*Needle, uint32) { - n := new(Needle) - bytes := make([]byte, 16) - count, e := r.Read(bytes) - if count <= 0 || e != nil { - return nil, 0 +func ReadNeedle(r *os.File, version Version) (n *Needle, bytesTillNextFile uint32) { + n = new(Needle) + if version == Version1 { + bytes := make([]byte, 16) + count, e := r.Read(bytes) + if count <= 0 || e != nil { + return nil, 0 + } + n.Cookie = util.BytesToUint32(bytes[0:4]) + n.Id = util.BytesToUint64(bytes[4:12]) + n.Size = util.BytesToUint32(bytes[12:16]) + rest := 8 - ((n.Size + 16 + 4) % 8) + bytesTillNextFile = n.Size + 4 + rest + } else if version == Version2 { } - n.Cookie = util.BytesToUint32(bytes[0:4]) - n.Id = util.BytesToUint64(bytes[4:12]) - n.Size = util.BytesToUint32(bytes[12:16]) - rest := 8 - ((n.Size + 16 + 4) % 8) - return n, n.Size + 4 + rest + return } diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index f5a810986..a9ffe4385 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -18,8 +18,8 @@ type Volume struct { dataFile *os.File nm *NeedleMap - replicaType ReplicationType version Version + replicaType ReplicationType accessLock sync.Mutex } @@ -51,11 +51,11 @@ func (v *Volume) load() error { return nil } func (v *Volume) Version() Version { - return CurrentVersion + return CurrentVersion } func (v *Volume) Size() int64 { - v.accessLock.Lock() - defer v.accessLock.Unlock() + v.accessLock.Lock() + defer v.accessLock.Unlock() stat, e := v.dataFile.Stat() if e == nil { return stat.Size() @@ -64,15 +64,15 @@ func (v *Volume) Size() int64 { return -1 } func (v *Volume) Close() { - v.accessLock.Lock() - defer v.accessLock.Unlock() + v.accessLock.Lock() + defer v.accessLock.Unlock() v.nm.Close() v.dataFile.Close() } func (v *Volume) maybeWriteSuperBlock() { stat, _ := v.dataFile.Stat() if stat.Size() == 0 { - v.version = CurrentVersion + v.version = CurrentVersion header := make([]byte, SuperBlockSize) header[0] = byte(v.version) header[1] = v.replicaType.Byte() @@ -85,12 +85,17 @@ func (v *Volume) readSuperBlock() error { if _, e := v.dataFile.Read(header); e != nil { return fmt.Errorf("cannot read superblock: %s", e) } - v.version = Version(header[0]) var err error - if v.replicaType, err = NewReplicationTypeFromByte(header[1]); err != nil { - return fmt.Errorf("cannot read replica type: %s", err) + v.version, v.replicaType, err = ParseSuperBlock(header) + return err +} +func ParseSuperBlock(header []byte) (version Version, replicaType ReplicationType, e error) { + version = Version(header[0]) + var err error + if replicaType, err = NewReplicationTypeFromByte(header[1]); err != nil { + e = fmt.Errorf("cannot read replica type: %s", err) } - return nil + return } func (v *Volume) NeedToReplicate() bool { return v.replicaType.GetCopyCount() > 1 @@ -133,7 +138,7 @@ func (v *Volume) read(n *Needle) (int, error) { } func (v *Volume) garbageLevel() float64 { - return float64(v.nm.deletionByteCounter)/float64(v.ContentSize()) + return float64(v.nm.deletionByteCounter) / float64(v.ContentSize()) } func (v *Volume) compact() error { @@ -143,7 +148,7 @@ func (v *Volume) compact() error { filePath := path.Join(v.dir, v.Id.String()) return v.copyDataAndGenerateIndexFile(filePath+".dat", filePath+".cpd", filePath+".cpx") } -func (v *Volume) commitCompact() (error) { +func (v *Volume) commitCompact() error { v.accessLock.Lock() defer v.accessLock.Unlock() v.dataFile.Close() @@ -185,7 +190,9 @@ func (v *Volume) copyDataAndGenerateIndexFile(srcName, dstName, idxName string) dst.Write(header) } - n, rest := ReadNeedle(src) + version, _, _ := ParseSuperBlock(header) + + n, rest := ReadNeedle(src, version) nm := NewNeedleMap(idx) old_offset := uint32(SuperBlockSize) new_offset := uint32(SuperBlockSize) @@ -208,11 +215,11 @@ func (v *Volume) copyDataAndGenerateIndexFile(srcName, dstName, idxName string) src.Seek(int64(rest-n.Size-4), 1) } old_offset += rest + 16 - n, rest = ReadNeedle(src) + n, rest = ReadNeedle(src, version) } return nil } -func (v *Volume) ContentSize() uint64{ - return v.nm.fileByteCounter +func (v *Volume) ContentSize() uint64 { + return v.nm.fileByteCounter }