From ea75165e8541a1ac602e110267e4bed2da3497eb Mon Sep 17 00:00:00 2001 From: "chris.lu@gmail.com" Date: Thu, 22 Dec 2011 04:04:47 +0000 Subject: [PATCH] works now! git-svn-id: https://weed-fs.googlecode.com/svn/trunk@20 282b0af5-e82d-9cf1-ede4-77906d7719d0 --- weed-fs/src/cmd/weedc.go | 59 ++++++++++++++++++++------- weed-fs/src/cmd/weeds.go | 24 ++++++----- weed-fs/src/pkg/directory/file_id.go | 15 +++---- weed-fs/src/pkg/storage/needle.go | 31 +++++++++++--- weed-fs/src/pkg/storage/needle_map.go | 58 +++++++++++++++++--------- weed-fs/src/pkg/storage/store.go | 9 ++++ weed-fs/src/pkg/storage/volume.go | 32 +++++++-------- 7 files changed, 151 insertions(+), 77 deletions(-) diff --git a/weed-fs/src/cmd/weedc.go b/weed-fs/src/cmd/weedc.go index 00e750e02..6b8800d23 100644 --- a/weed-fs/src/cmd/weedc.go +++ b/weed-fs/src/cmd/weedc.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "http" + "json" "log" "mime" "strconv" @@ -18,10 +19,12 @@ var ( publicServer = flag.String("pserver", "localhost:8080", "public server to serve data read") metaServer = flag.String("mserver", "localhost:9333", "metadata server to store mappings") - store *storage.Store + store *storage.Store ) - +func statusHandler(w http.ResponseWriter, r *http.Request) { + writeJson(w, r, store.Status()) +} func storeHandler(w http.ResponseWriter, r *http.Request) { switch r.Method { case "GET": @@ -35,25 +38,52 @@ func storeHandler(w http.ResponseWriter, r *http.Request) { func GetHandler(w http.ResponseWriter, r *http.Request) { n := new(storage.Needle) path := r.URL.Path - sepIndex := strings.Index(path[1:], "/") + 1 - volumeId, _ := strconv.Atoui64(path[1:sepIndex]) - dotIndex := strings.LastIndex(path, ".") - n.ParsePath(path[sepIndex+1 : dotIndex]) - ext := path[dotIndex:] + sepIndex := strings.LastIndex(path, "/") + commaIndex := strings.LastIndex(path[sepIndex:], ",") + dotIndex := strings.LastIndex(path[sepIndex:], ".") + fid := path[commaIndex+1:] + if dotIndex > 0 { + fid = path[commaIndex+1 : dotIndex] + } + volumeId, _ := strconv.Atoui64(path[sepIndex+1 : commaIndex]) + n.ParsePath(fid) store.Read(volumeId, n) - w.Header().Set("Content-Type", mime.TypeByExtension(ext)) + if dotIndex > 0 { + ext := path[dotIndex:] + w.Header().Set("Content-Type", mime.TypeByExtension(ext)) + } w.Write(n.Data) } func PostHandler(w http.ResponseWriter, r *http.Request) { - volumeId, _ := strconv.Atoui64(r.FormValue("volumeId")) - store.Write(volumeId, storage.NewNeedle(r)) - w.Header().Set("Content-Type", "text/plain") - fmt.Fprint(w, "volumeId=", volumeId, "\n") + path := r.URL.Path + commaIndex := strings.LastIndex(path, ",") + sepIndex := strings.LastIndex(path[:commaIndex], "/") + volumeId, e := strconv.Atoui64(path[sepIndex+1 : commaIndex]) + if e != nil { + writeJson(w, r, e) + } else { + store.Write(volumeId, storage.NewNeedle(r)) + writeJson(w, r, make(map[string]string)) + } } func DeleteHandler(w http.ResponseWriter, r *http.Request) { } +func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) { + w.Header().Set("Content-Type", "application/javascript") + bytes, _ := json.Marshal(obj) + callback := r.FormValue("callback") + if callback == "" { + w.Write(bytes) + } else { + w.Write([]uint8(callback)) + w.Write([]uint8("(")) + fmt.Fprint(w, string(bytes)) + w.Write([]uint8(")")) + } + //log.Println("JSON Response", string(bytes)) +} func main() { flag.Parse() @@ -61,14 +91,15 @@ func main() { store = storage.NewStore(*port, *publicServer, *chunkFolder, 1024*1024*1024, *chunkCount) defer store.Close() http.HandleFunc("/", storeHandler) + http.HandleFunc("/status", statusHandler) store.Join(*metaServer) log.Println("store joined at", *metaServer) log.Println("Start storage service at http://127.0.0.1:" + strconv.Itoa(*port)) e := http.ListenAndServe(":"+strconv.Itoa(*port), nil) - if e!=nil { - log.Fatalf("Fail to start:",e.String()) + if e != nil { + log.Fatalf("Fail to start:", e.String()) } } diff --git a/weed-fs/src/cmd/weeds.go b/weed-fs/src/cmd/weeds.go index c4abd0288..396f5f271 100644 --- a/weed-fs/src/cmd/weeds.go +++ b/weed-fs/src/cmd/weeds.go @@ -17,11 +17,15 @@ var ( metaFolder = flag.String("mdir", "/tmp", "data directory to store mappings") capacity = flag.Int("capacity", 100, "maximum number of volumes to hold") mapper *directory.Mapper - ) -func dirReadHandler(w http.ResponseWriter, r *http.Request) { - volumeId, _ := strconv.Atoui64(r.FormValue("volumeId")) +func dirLookupHandler(w http.ResponseWriter, r *http.Request) { + vid := r.FormValue("volumeId") + commaSep := strings.Index(vid, ",") + if commaSep > 0 { + vid = vid[0:commaSep] + } + volumeId, _ := strconv.Atoui64(vid) machine := mapper.Get(uint32(volumeId)) writeJson(w, r, machine.Server) } @@ -29,9 +33,9 @@ func dirWriteHandler(w http.ResponseWriter, r *http.Request) { _, machine := mapper.PickForWrite() writeJson(w, r, machine) } -func dirPickHandler(w http.ResponseWriter, r *http.Request) { - fid, machine := mapper.PickForWrite() - writeJson(w, r, map[string]string{"fid":fid,"url":machine.Url}) +func dirAssignHandler(w http.ResponseWriter, r *http.Request) { + fid, machine := mapper.PickForWrite() + writeJson(w, r, map[string]string{"fid": fid, "url": machine.Url}) } func dirJoinHandler(w http.ResponseWriter, r *http.Request) { s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") @@ -44,9 +48,7 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) { writeJson(w, r, vids) } func dirStatusHandler(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "text/plain") - bytes, _ := json.Marshal(mapper) - fmt.Fprint(w, string(bytes)) + writeJson(w, r, mapper) } func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) { w.Header().Set("Content-Type", "application/javascript") @@ -67,9 +69,9 @@ func main() { flag.Parse() mapper = directory.NewMapper(*metaFolder, "directory", *capacity) defer mapper.Save() - http.HandleFunc("/dir/read", dirReadHandler) + http.HandleFunc("/dir/assign", dirAssignHandler) + http.HandleFunc("/dir/lookup", dirLookupHandler) http.HandleFunc("/dir/write", dirWriteHandler) - http.HandleFunc("/dir/pick", dirPickHandler) http.HandleFunc("/dir/join", dirJoinHandler) http.HandleFunc("/dir/status", dirStatusHandler) diff --git a/weed-fs/src/pkg/directory/file_id.go b/weed-fs/src/pkg/directory/file_id.go index 7cc91c3f6..a88daf752 100644 --- a/weed-fs/src/pkg/directory/file_id.go +++ b/weed-fs/src/pkg/directory/file_id.go @@ -2,6 +2,7 @@ package directory import ( "encoding/hex" + "log" "storage" "strconv" "strings" @@ -16,21 +17,15 @@ type FileId struct { func NewFileId(VolumeId uint32, Key uint64, Hashcode uint32) *FileId { return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode} } - -func ParseFileId(path string) *FileId { - a := strings.Split(path, ",") +func ParseFileId(fid string) *FileId { + a := strings.Split(fid, ",") if len(a) != 2 { + log.Println("Invalid fid", fid, ", split length", len(a)) return nil } vid_string, key_hash_string := a[0], a[1] - key_hash_bytes, khe := hex.DecodeString(key_hash_string) - key_hash_len := len(key_hash_bytes) - if khe != nil || key_hash_len <= 4 { - return nil - } vid, _ := strconv.Atoui64(vid_string) - key := storage.BytesToUint64(key_hash_bytes[0 : key_hash_len-4]) - hash := storage.BytesToUint32(key_hash_bytes[key_hash_len-4 : key_hash_len]) + key, hash := storage.ParseKeyHash(key_hash_string) return &FileId{VolumeId: uint32(vid), Key: key, Hashcode: hash} } func (n *FileId) String() string { diff --git a/weed-fs/src/pkg/storage/needle.go b/weed-fs/src/pkg/storage/needle.go index f60a15a5a..7b79faa78 100644 --- a/weed-fs/src/pkg/storage/needle.go +++ b/weed-fs/src/pkg/storage/needle.go @@ -1,6 +1,7 @@ package storage import ( + "encoding/hex" "io" "io/ioutil" "http" @@ -27,17 +28,24 @@ func NewNeedle(r *http.Request) (n *Needle) { data, _ := ioutil.ReadAll(part) n.Data = data - n.ParsePath(r.URL.Path[1:strings.LastIndex(r.URL.Path, ".")]) + commaSep := strings.LastIndex(r.URL.Path, ",") + dotSep := strings.LastIndex(r.URL.Path, ".") + fid := r.URL.Path[commaSep+1:] + if dotSep > 0 { + fid = r.URL.Path[commaSep+1:dotSep] + } + + n.ParsePath(fid) return } -func (n *Needle) ParsePath(path string) { - if len(path) != 16 { +func (n *Needle) ParsePath(fid string) { + length := len(fid) + if length <= 8 { + log.Println("Invalid fid", fid, "length", length) return } - bytes := []byte(path) - n.Cookie = BytesToUint32(bytes[12:16]) - n.Key = BytesToUint64(bytes[4:12]) + n.Key, n.Cookie = ParseKeyHash(fid) } func (n *Needle) Append(w io.Writer) { header := make([]byte, 16) @@ -60,3 +68,14 @@ func (n *Needle) Read(r io.Reader, size uint32) { n.Data = bytes[16 : 16+size] n.Checksum = int32(BytesToUint32(bytes[16+size : 16+size+4])) } +func ParseKeyHash(key_hash_string string)(uint64,uint32) { + key_hash_bytes, khe := hex.DecodeString(key_hash_string) + key_hash_len := len(key_hash_bytes) + if khe != nil || key_hash_len <= 4 { + log.Println("Invalid key_hash", key_hash_string, "length:", key_hash_len, "error", khe) + return 0, 0 + } + key := BytesToUint64(key_hash_bytes[0 : key_hash_len-4]) + hash := BytesToUint32(key_hash_bytes[key_hash_len-4 : key_hash_len]) + return key, hash +} diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go index 3fbf1f6f5..ef7764174 100644 --- a/weed-fs/src/pkg/storage/needle_map.go +++ b/weed-fs/src/pkg/storage/needle_map.go @@ -1,42 +1,60 @@ package storage import ( + "log" "os" ) -type NeedleKey struct { - Key uint64 "file id" -} - -func (k *NeedleKey) String() string { - var tmp [12]byte - for i := uint(0); i < 8; i++ { - tmp[i] = byte(k.Key >> (8 * i)) - } - return string(tmp[:]) -} - type NeedleValue struct { Offset uint32 "Volume offset" //since aligned to 8 bytes, range is 4G*8=32G Size uint32 "Size of the data portion" } type NeedleMap struct { - m map[uint64]*NeedleValue //mapping NeedleKey(Key,AlternateKey) to NeedleValue + indexFile *os.File + m map[uint64]*NeedleValue //mapping needle key(uint64) to NeedleValue + bytes []byte } func NewNeedleMap() *NeedleMap { - return &NeedleMap{m: make(map[uint64]*NeedleValue)} -} -func (nm *NeedleMap) load(file *os.File) { + return &NeedleMap{m: make(map[uint64]*NeedleValue), bytes: make([]byte, 16)} } -func makeKey(key uint64) uint64 { - return key + +const ( + RowsToRead = 1024 +) + +func LoadNeedleMap(file *os.File) *NeedleMap { + nm := NewNeedleMap() + nm.indexFile = file + bytes := make([]byte, 16*RowsToRead) + count, e := nm.indexFile.Read(bytes) + if count > 0 { + fstat, _ := file.Stat() + log.Println("Loading index file", fstat.Name, "size", fstat.Size) + } + for count > 0 && e == nil { + for i := 0; i < count; i += 16 { + key := BytesToUint64(bytes[i : i+8]) + offset := BytesToUint32(bytes[i+8 : i+12]) + size := BytesToUint32(bytes[i+12 : i+16]) + nm.m[key] = &NeedleValue{Offset: offset, Size: size} + } + count, e = nm.indexFile.Read(bytes) + } + return nm } func (nm *NeedleMap) put(key uint64, offset uint32, size uint32) { - nm.m[makeKey(key)] = &NeedleValue{Offset: offset, Size: size} + nm.m[key] = &NeedleValue{Offset: offset, Size: size} + Uint64toBytes(nm.bytes[0:8], key) + Uint32toBytes(nm.bytes[8:12], offset) + Uint32toBytes(nm.bytes[12:16], size) + nm.indexFile.Write(nm.bytes) } func (nm *NeedleMap) get(key uint64) (element *NeedleValue, ok bool) { - element, ok = nm.m[makeKey(key)] + element, ok = nm.m[key] return } +func (nm *NeedleMap) Close() { + nm.indexFile.Close() +} diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index 44e4d252e..79575d1ae 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -42,6 +42,15 @@ func NewStore(port int, publicServer, dirname string, chunkSize, capacity int) ( return } +func (s *Store) Status()(*[]*VolumeInfo){ + stats := new([]*VolumeInfo) + for k, v := range s.volumes { + s := new(VolumeInfo) + s.Id, s.Size = uint32(k), v.Size() + *stats = append(*stats, s) + } + return stats +} func (s *Store) Join(mserver string) { stats := new([]*VolumeInfo) for k, v := range s.volumes { diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index 457a93b5b..99f7e40cb 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -8,27 +8,27 @@ import ( ) type Volume struct { - Id uint32 - dir string - dataFile, indexFile *os.File - nm *NeedleMap + Id uint32 + dir string + dataFile *os.File + nm *NeedleMap accessChannel chan int } func NewVolume(dirname string, id uint32) (v *Volume) { var e os.Error - v = &Volume{dir:dirname,Id:id, nm:NewNeedleMap()} + v = &Volume{dir: dirname, Id: id, nm: NewNeedleMap()} fileName := strconv.Uitoa64(uint64(v.Id)) - v.dataFile, e = os.OpenFile(path.Join(v.dir,fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644) + v.dataFile, e = os.OpenFile(path.Join(v.dir, fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644) if e != nil { log.Fatalf("New Volume [ERROR] %s\n", e) } - v.indexFile, e = os.OpenFile(path.Join(v.dir,fileName+".idx"), os.O_RDWR|os.O_CREATE, 0644) - if e != nil { - log.Fatalf("New Volume [ERROR] %s\n", e) + indexFile, ie := os.OpenFile(path.Join(v.dir, fileName+".idx"), os.O_RDWR|os.O_CREATE, 0644) + if ie != nil { + log.Fatalf("New Volume [ERROR] %s\n", ie) } - v.nm.load(v.indexFile) + v.nm = LoadNeedleMap(indexFile) v.accessChannel = make(chan int, 1) v.accessChannel <- 0 @@ -36,16 +36,16 @@ func NewVolume(dirname string, id uint32) (v *Volume) { return } func (v *Volume) Size() int64 { - stat, e:=v.dataFile.Stat() - if e!=nil{ - return stat.Size - } - return -1 + stat, e := v.dataFile.Stat() + if e == nil { + return stat.Size + } + return -1 } func (v *Volume) Close() { close(v.accessChannel) + v.nm.Close() v.dataFile.Close() - v.indexFile.Close() } func (v *Volume) write(n *Needle) {