Browse Source

refactoring

pull/2/head
Chris Lu 12 years ago
parent
commit
11cc489ca5
  1. 4
      weed-fs/src/cmd/weed/fix.go
  2. 16
      weed-fs/src/pkg/storage/needle.go
  3. 32
      weed-fs/src/pkg/storage/needle_read_write.go
  4. 30
      weed-fs/src/pkg/storage/volume.go

4
weed-fs/src/cmd/weed/fix.go

@ -52,7 +52,7 @@ func runFix(cmd *Command, args []string) bool {
ver, _, _ := storage.ParseSuperBlock(header) ver, _, _ := storage.ParseSuperBlock(header)
n, rest := storage.ReadNeedle(dataFile, ver)
n, rest := storage.ReadNeedleHeader(dataFile, ver)
dataFile.Seek(int64(rest), 1) dataFile.Seek(int64(rest), 1)
nm := storage.NewNeedleMap(indexFile) nm := storage.NewNeedleMap(indexFile)
offset := uint32(storage.SuperBlockSize) offset := uint32(storage.SuperBlockSize)
@ -63,7 +63,7 @@ func runFix(cmd *Command, args []string) bool {
debug("saved", count, "with error", pe) debug("saved", count, "with error", pe)
} }
offset += rest + 16 offset += rest + 16
n, rest = storage.ReadNeedle(dataFile, ver)
n, rest = storage.ReadNeedleHeader(dataFile, ver)
dataFile.Seek(int64(rest), 1) dataFile.Seek(int64(rest), 1)
} }
return true return true

16
weed-fs/src/pkg/storage/needle.go

@ -11,16 +11,22 @@ import (
"strings" "strings"
) )
const (
NeedleHeaderSize = 16 //should never change this
NeedlePaddingSize = 8
)
type Needle struct { type Needle struct {
Cookie uint32 "random number to mitigate brute force lookups" Cookie uint32 "random number to mitigate brute force lookups"
Id uint64 "needle id" Id uint64 "needle id"
Size uint32 "sum of DataSize,Data,NameSize,Name,MimeSize,Mime" Size uint32 "sum of DataSize,Data,NameSize,Name,MimeSize,Mime"
// DataSize uint32 "Data size"
Flags byte "boolean flags" //version2
DataSize uint32 "Data size" //version2
Data []byte "The actual file data" Data []byte "The actual file data"
// NameSize uint16
// Name []byte "maximum 256 characters"
// MimeSize uint16
// Mime []byte "maximum 256 characters"
NameSize uint8 //version2
Name []byte "maximum 256 characters" //version2
MimeSize uint8 //version2
Mime []byte "maximum 256 characters" //version2
Checksum CRC "CRC32 to check integrity" Checksum CRC "CRC32 to check integrity"
Padding []byte "Aligned to 8 bytes" Padding []byte "Aligned to 8 bytes"
} }

32
weed-fs/src/pkg/storage/needle_read_write.go

@ -7,22 +7,25 @@ import (
"pkg/util" "pkg/util"
) )
func (n *Needle) Append(w io.Writer) uint32 {
header := make([]byte, 16)
func (n *Needle) Append(w io.Writer, version Version) uint32 {
if version == Version1 {
header := make([]byte, NeedleHeaderSize)
util.Uint32toBytes(header[0:4], n.Cookie) util.Uint32toBytes(header[0:4], n.Cookie)
util.Uint64toBytes(header[4:12], n.Id) util.Uint64toBytes(header[4:12], n.Id)
n.Size = uint32(len(n.Data)) n.Size = uint32(len(n.Data))
util.Uint32toBytes(header[12:16], n.Size) util.Uint32toBytes(header[12:16], n.Size)
w.Write(header) w.Write(header)
w.Write(n.Data) w.Write(n.Data)
rest := 8 - ((16 + n.Size + 4) % 8)
rest := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + 4) % NeedlePaddingSize)
util.Uint32toBytes(header[0:4], n.Checksum.Value()) util.Uint32toBytes(header[0:4], n.Checksum.Value())
w.Write(header[0 : 4+rest]) w.Write(header[0 : 4+rest])
} else if version == Version2 {
}
return n.Size return n.Size
} }
func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) { func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) {
if version == Version1 { if version == Version1 {
bytes := make([]byte, 16+size+4)
bytes := make([]byte, NeedleHeaderSize+size+4)
ret, e := r.Read(bytes) ret, e := r.Read(bytes)
n.Cookie = util.BytesToUint32(bytes[0:4]) n.Cookie = util.BytesToUint32(bytes[0:4])
n.Id = util.BytesToUint64(bytes[4:12]) n.Id = util.BytesToUint64(bytes[4:12])
@ -37,10 +40,10 @@ func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) {
} }
return 0, errors.New("Unsupported Version!") return 0, errors.New("Unsupported Version!")
} }
func ReadNeedle(r *os.File, version Version) (n *Needle, bytesTillNextFile uint32) {
func ReadNeedleHeader(r *os.File, version Version) (n *Needle, bodyLength uint32) {
n = new(Needle) n = new(Needle)
if version == Version1 { if version == Version1 {
bytes := make([]byte, 16)
bytes := make([]byte, NeedleHeaderSize)
count, e := r.Read(bytes) count, e := r.Read(bytes)
if count <= 0 || e != nil { if count <= 0 || e != nil {
return nil, 0 return nil, 0
@ -48,8 +51,21 @@ func ReadNeedle(r *os.File, version Version) (n *Needle, bytesTillNextFile uint3
n.Cookie = util.BytesToUint32(bytes[0:4]) n.Cookie = util.BytesToUint32(bytes[0:4])
n.Id = util.BytesToUint64(bytes[4:12]) n.Id = util.BytesToUint64(bytes[4:12])
n.Size = util.BytesToUint32(bytes[12:16]) n.Size = util.BytesToUint32(bytes[12:16])
rest := 8 - ((n.Size + 16 + 4) % 8)
bytesTillNextFile = n.Size + 4 + rest
rest := NeedlePaddingSize - ((n.Size + 16 + 4) % NeedlePaddingSize)
bodyLength = n.Size + 4 + rest
} else if version == Version2 {
}
return
}
//n should be a needle already read the header
//the input stream will read until next file entry
func (n *Needle) ReadNeedleBody(r *os.File, version Version, bodyLength uint32) {
if version == Version1 {
bytes := make([]byte, bodyLength)
r.Read(bytes)
n.Data = bytes[:n.Size]
n.Checksum = NewCRC(n.Data)
} else if version == Version2 { } else if version == Version2 {
} }
return return

30
weed-fs/src/pkg/storage/volume.go

@ -105,7 +105,7 @@ func (v *Volume) write(n *Needle) uint32 {
v.accessLock.Lock() v.accessLock.Lock()
defer v.accessLock.Unlock() defer v.accessLock.Unlock()
offset, _ := v.dataFile.Seek(0, 2) offset, _ := v.dataFile.Seek(0, 2)
ret := n.Append(v.dataFile)
ret := n.Append(v.dataFile, v.version)
nv, ok := v.nm.Get(n.Id) nv, ok := v.nm.Get(n.Id)
if !ok || int64(nv.Offset)*8 < offset { if !ok || int64(nv.Offset)*8 < offset {
v.nm.Put(n.Id, uint32(offset/8), n.Size) v.nm.Put(n.Id, uint32(offset/8), n.Size)
@ -119,8 +119,8 @@ func (v *Volume) delete(n *Needle) uint32 {
//log.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size) //log.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
if ok { if ok {
v.nm.Delete(n.Id) v.nm.Delete(n.Id)
v.dataFile.Seek(int64(nv.Offset*8), 0)
n.Append(v.dataFile)
v.dataFile.Seek(int64(nv.Offset*NeedlePaddingSize), 0)
n.Append(v.dataFile, v.version)
return nv.Size return nv.Size
} }
return 0 return 0
@ -131,7 +131,7 @@ func (v *Volume) read(n *Needle) (int, error) {
defer v.accessLock.Unlock() defer v.accessLock.Unlock()
nv, ok := v.nm.Get(n.Id) nv, ok := v.nm.Get(n.Id)
if ok && nv.Offset > 0 { if ok && nv.Offset > 0 {
v.dataFile.Seek(int64(nv.Offset)*8, 0)
v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0)
return n.Read(v.dataFile, nv.Size, v.version) return n.Read(v.dataFile, nv.Size, v.version)
} }
return -1, errors.New("Not Found") return -1, errors.New("Not Found")
@ -192,30 +192,28 @@ func (v *Volume) copyDataAndGenerateIndexFile(srcName, dstName, idxName string)
version, _, _ := ParseSuperBlock(header) version, _, _ := ParseSuperBlock(header)
n, rest := ReadNeedle(src, version)
n, rest := ReadNeedleHeader(src, version)
nm := NewNeedleMap(idx) nm := NewNeedleMap(idx)
old_offset := uint32(SuperBlockSize) old_offset := uint32(SuperBlockSize)
new_offset := uint32(SuperBlockSize) new_offset := uint32(SuperBlockSize)
for n != nil { for n != nil {
nv, ok := v.nm.Get(n.Id) nv, ok := v.nm.Get(n.Id)
//log.Println("file size is", n.Size, "rest", rest) //log.Println("file size is", n.Size, "rest", rest)
if !ok || nv.Offset*8 != old_offset {
if !ok || nv.Offset*NeedlePaddingSize != old_offset {
src.Seek(int64(rest), 1) src.Seek(int64(rest), 1)
} else { } else {
if nv.Size > 0 { if nv.Size > 0 {
nm.Put(n.Id, new_offset/8, n.Size)
bytes := make([]byte, n.Size+4)
src.Read(bytes)
n.Data = bytes[:n.Size]
n.Checksum = NewCRC(n.Data)
n.Append(dst)
new_offset += rest + 16
nm.Put(n.Id, new_offset/NeedlePaddingSize, n.Size)
n.ReadNeedleBody(src, version, rest)
n.Append(dst, v.version)
new_offset += rest + NeedleHeaderSize
//log.Println("saving key", n.Id, "volume offset", old_offset, "=>", new_offset, "data_size", n.Size, "rest", rest) //log.Println("saving key", n.Id, "volume offset", old_offset, "=>", new_offset, "data_size", n.Size, "rest", rest)
} else {
src.Seek(int64(rest), 1)
} }
src.Seek(int64(rest-n.Size-4), 1)
} }
old_offset += rest + 16
n, rest = ReadNeedle(src, version)
old_offset += rest + NeedleHeaderSize
n, rest = ReadNeedleHeader(src, version)
} }
return nil return nil

Loading…
Cancel
Save