diff --git a/weed-fs/src/cmd/fix_volume_index.go b/weed-fs/src/cmd/fix_volume_index.go new file mode 100644 index 000000000..94cbc4b50 --- /dev/null +++ b/weed-fs/src/cmd/fix_volume_index.go @@ -0,0 +1,59 @@ +package main + +import ( + "storage" + "flag" + "log" + "os" + "path" + "strconv" +) + +var ( + dir = flag.String("dir", "/tmp", "data directory to store files") + volumeId = flag.Int("volumeId", -1, "volume id") + IsDebug = flag.Bool("debug", false, "enable debug mode") + + store *storage.Store +) + +func main() { + flag.Parse() + + if *volumeId == -1 { + flag.Usage() + return + } + + fileName := strconv.Itoa(*volumeId) + dataFile, e := os.OpenFile(path.Join(*dir, fileName+".dat"), os.O_RDONLY, 0644) + if e != nil { + log.Fatalf("Read Volume [ERROR] %s\n", e) + } + defer dataFile.Close() + indexFile, ie := os.OpenFile(path.Join(*dir, fileName+".idx"), os.O_WRONLY|os.O_CREATE, 0644) + if ie != nil { + log.Fatalf("Create Volume Index [ERROR] %s\n", ie) + } + defer indexFile.Close() + + //skip the volume super block + dataFile.Seek(storage.SuperBlockSize, 0) + + n, length := storage.ReadNeedle(dataFile) + nm := storage.NewNeedleMap(indexFile) + offset := uint32(storage.SuperBlockSize) + for n != nil { + if *IsDebug { + log.Println("key", n.Key, "volume offset", offset, "data_size", n.Size, "length", length) + } + if n.Size > 0 { + count, pe := nm.Put(n.Key, offset/8, n.Size) + if *IsDebug { + log.Println("saved", count, "with error", pe) + } + } + offset += length + n, length = storage.ReadNeedle(dataFile) + } +} diff --git a/weed-fs/src/cmd/weedc.go b/weed-fs/src/cmd/weedc.go index 442441506..2591b1fb8 100644 --- a/weed-fs/src/cmd/weedc.go +++ b/weed-fs/src/cmd/weedc.go @@ -56,7 +56,15 @@ func GetHandler(w http.ResponseWriter, r *http.Request) { if *IsDebug { log.Println("volume", volumeId, "reading", n) } - store.Read(volumeId, n) + cookie := n.Cookie + count, e := store.Read(volumeId, n) + if *IsDebug { + log.Println("read bytes", count, "error", e) + } + if n.Cookie != cookie { + log.Println("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) + return + } if dotIndex > 0 { ext := path[dotIndex:] w.Header().Set("Content-Type", mime.TypeByExtension(ext)) @@ -78,7 +86,35 @@ func PostHandler(w http.ResponseWriter, r *http.Request) { } } func DeleteHandler(w http.ResponseWriter, r *http.Request) { + n := new(storage.Needle) + path := r.URL.Path + sepIndex := strings.LastIndex(path, "/") + commaIndex := strings.LastIndex(path[sepIndex:], ",") + dotIndex := strings.LastIndex(path[sepIndex:], ".") + fid := path[commaIndex+1:] + if dotIndex > 0 { + fid = path[commaIndex+1 : dotIndex] + } + if commaIndex <= 0 { + log.Println("unknown file id", path[sepIndex+1:commaIndex]) + return + } + volumeId, _ := strconv.Atoui64(path[sepIndex+1 : commaIndex]) + n.ParsePath(fid) + + cookie := n.Cookie + count, _ := store.Read(volumeId, n) + if n.Cookie != cookie { + log.Println("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) + return + } + + n.Size = 0 + store.Write(volumeId, n) + m := make(map[string]uint32) + m["size"] = uint32(count) + writeJson(w, r, m) } func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) { w.Header().Set("Content-Type", "application/javascript") diff --git a/weed-fs/src/pkg/storage/needle.go b/weed-fs/src/pkg/storage/needle.go index a3a48b793..c643e7651 100644 --- a/weed-fs/src/pkg/storage/needle.go +++ b/weed-fs/src/pkg/storage/needle.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "http" "log" + "os" "strings" ) @@ -60,14 +61,29 @@ func (n *Needle) Append(w io.Writer) (uint32){ w.Write(header[0 : rest+4]) return n.Size } -func (n *Needle) Read(r io.Reader, size uint32) { +func (n *Needle) Read(r io.Reader, size uint32)(int, os.Error) { bytes := make([]byte, size+16+4) - r.Read(bytes) + ret, e := r.Read(bytes) n.Cookie = BytesToUint32(bytes[0:4]) n.Key = BytesToUint64(bytes[4:12]) n.Size = BytesToUint32(bytes[12:16]) n.Data = bytes[16 : 16+size] n.Checksum = int32(BytesToUint32(bytes[16+size : 16+size+4])) + return ret, e +} +func ReadNeedle(r *os.File) (*Needle, uint32) { + n := new(Needle) + bytes := make([]byte, 16) + count, e := r.Read(bytes) + if count<=0 || e!=nil { + return nil, 0 + } + n.Cookie = BytesToUint32(bytes[0:4]) + n.Key = BytesToUint64(bytes[4:12]) + n.Size = BytesToUint32(bytes[12:16]) + rest := 8 - ((n.Size + 16 + 4) % 8) + r.Seek(int64(n.Size+4+rest), 1) + return n, 16+n.Size+4+rest } func ParseKeyHash(key_hash_string string)(uint64,uint32) { key_hash_bytes, khe := hex.DecodeString(key_hash_string) diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go index ef7764174..f870bd497 100644 --- a/weed-fs/src/pkg/storage/needle_map.go +++ b/weed-fs/src/pkg/storage/needle_map.go @@ -16,8 +16,13 @@ type NeedleMap struct { bytes []byte } -func NewNeedleMap() *NeedleMap { - return &NeedleMap{m: make(map[uint64]*NeedleValue), bytes: make([]byte, 16)} +func NewNeedleMap(file *os.File) *NeedleMap { + nm := &NeedleMap{ + m: make(map[uint64]*NeedleValue), + bytes: make([]byte, 16), + indexFile: file, + } + return nm } const ( @@ -25,12 +30,11 @@ const ( ) func LoadNeedleMap(file *os.File) *NeedleMap { - nm := NewNeedleMap() - nm.indexFile = file + nm := NewNeedleMap(file) bytes := make([]byte, 16*RowsToRead) count, e := nm.indexFile.Read(bytes) if count > 0 { - fstat, _ := file.Stat() + fstat, _ := file.Stat() log.Println("Loading index file", fstat.Name, "size", fstat.Size) } for count > 0 && e == nil { @@ -38,23 +42,35 @@ func LoadNeedleMap(file *os.File) *NeedleMap { key := BytesToUint64(bytes[i : i+8]) offset := BytesToUint32(bytes[i+8 : i+12]) size := BytesToUint32(bytes[i+12 : i+16]) - nm.m[key] = &NeedleValue{Offset: offset, Size: size} + if offset > 0 { + nm.m[key] = &NeedleValue{Offset: offset, Size: size} + } } count, e = nm.indexFile.Read(bytes) } return nm } -func (nm *NeedleMap) put(key uint64, offset uint32, size uint32) { - nm.m[key] = &NeedleValue{Offset: offset, Size: size} +func (nm *NeedleMap) PutInMap(key uint64, offset uint32, size uint32) { + nm.m[key] = &NeedleValue{Offset: offset, Size: size} +} +func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, os.Error) { + nm.PutInMap(key,offset,size) Uint64toBytes(nm.bytes[0:8], key) Uint32toBytes(nm.bytes[8:12], offset) Uint32toBytes(nm.bytes[12:16], size) - nm.indexFile.Write(nm.bytes) + return nm.indexFile.Write(nm.bytes) } -func (nm *NeedleMap) get(key uint64) (element *NeedleValue, ok bool) { +func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { element, ok = nm.m[key] return } +func (nm *NeedleMap) Delete(key uint64) { + nm.m[key] = nil, false + Uint64toBytes(nm.bytes[0:8], key) + Uint32toBytes(nm.bytes[8:12], 0) + Uint32toBytes(nm.bytes[12:16], 0) + nm.indexFile.Write(nm.bytes) +} func (nm *NeedleMap) Close() { nm.indexFile.Close() } diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index d2b9ab1eb..2a49124c4 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -4,6 +4,7 @@ import ( "log" "io/ioutil" "json" + "os" "strings" "strconv" "url" @@ -86,6 +87,6 @@ func (s *Store) Close() { func (s *Store) Write(i uint64, n *Needle) (uint32){ return s.volumes[i].write(n) } -func (s *Store) Read(i uint64, n *Needle) { - s.volumes[i].read(n) +func (s *Store) Read(i uint64, n *Needle) (int, os.Error){ + return s.volumes[i].read(n) } diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index 22b653b71..438a0f56e 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -4,35 +4,38 @@ import ( "os" "path" "strconv" + "sync" "log" ) +const ( + SuperBlockSize = 8 +) + type Volume struct { Id uint32 dir string dataFile *os.File nm *NeedleMap - accessChannel chan int + accessLock sync.Mutex } func NewVolume(dirname string, id uint32) (v *Volume) { var e os.Error - v = &Volume{dir: dirname, Id: id, nm: NewNeedleMap()} + v = &Volume{dir: dirname, Id: id} fileName := strconv.Uitoa64(uint64(v.Id)) v.dataFile, e = os.OpenFile(path.Join(v.dir, fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644) if e != nil { log.Fatalf("New Volume [ERROR] %s\n", e) } + v.maybeWriteSuperBlock() indexFile, ie := os.OpenFile(path.Join(v.dir, fileName+".idx"), os.O_RDWR|os.O_CREATE, 0644) if ie != nil { - log.Fatalf("New Volume [ERROR] %s\n", ie) + log.Fatalf("Write Volume Index [ERROR] %s\n", ie) } v.nm = LoadNeedleMap(indexFile) - v.accessChannel = make(chan int, 1) - v.accessChannel <- 0 - return } func (v *Volume) Size() int64 { @@ -43,28 +46,41 @@ func (v *Volume) Size() int64 { return -1 } func (v *Volume) Close() { - close(v.accessChannel) v.nm.Close() v.dataFile.Close() } +func (v *Volume) maybeWriteSuperBlock() { + stat, _ := v.dataFile.Stat() + if stat.Size == 0 { + header := make([]byte, SuperBlockSize) + header[0] = 1 + v.dataFile.Write(header) + } +} -func (v *Volume) write(n *Needle) uint32{ - counter := <-v.accessChannel +func (v *Volume) write(n *Needle) uint32 { + v.accessLock.Lock() + defer v.accessLock.Unlock() offset, _ := v.dataFile.Seek(0, 2) ret := n.Append(v.dataFile) - nv, ok := v.nm.get(n.Key) + nv, ok := v.nm.Get(n.Key) if !ok || int64(nv.Offset)*8 < offset { - v.nm.put(n.Key, uint32(offset/8), n.Size) + v.nm.Put(n.Key, uint32(offset/8), n.Size) } - v.accessChannel <- counter + 1 return ret } -func (v *Volume) read(n *Needle) { - counter := <-v.accessChannel - nv, ok := v.nm.get(n.Key) +func (v *Volume) read(n *Needle) (int, os.Error) { + v.accessLock.Lock() + defer v.accessLock.Unlock() + nv, ok := v.nm.Get(n.Key) if ok && nv.Offset > 0 { v.dataFile.Seek(int64(nv.Offset)*8, 0) - n.Read(v.dataFile, nv.Size) + return n.Read(v.dataFile, nv.Size) } - v.accessChannel <- counter + 1 + return -1, os.EOF +} +func (v *Volume) delete(n *Needle) { + v.accessLock.Lock() + defer v.accessLock.Unlock() + v.nm.Delete(n.Key) }