diff --git a/weed-fs/note/weedfs.txt b/weed-fs/note/weedfs.txt new file mode 100644 index 000000000..d07ad9484 --- /dev/null +++ b/weed-fs/note/weedfs.txt @@ -0,0 +1,46 @@ +How to submit a content +1. Find physical volumes +1.c Create a hash value +1.d find a write logic volume id, and return [logic volume id, {physical volume ids}] +2. submit to physical volumes +2.c + generate the cookie + generate a unique id as key + choose the right altKey + send bytes to physical volumes +2.s each + save bytes + store map[key uint64, altKey uint32] + for updated entry, set old entry's offset to zero +3.c + wait for all physical volumes to finish + store the //__. + +How to retrieve a content +1.c + send logic volume id +1.d + find least busy volume's id +2.c + send URI //__. + + +How to submit a content +1. send bytes to weedfs, got + store , and other information + +To read a content +2. use logic volume id to lookup a + render url as ////.ext + +The directory server +0.init + load and collect mapping +1.on submit content + find a free logic volume id, start sending content to 3 machines + if all of them finishes, return +2.on read content + based on logic volume id, pick a machine with less load, + return + + diff --git a/weed-fs/src/cmd/gdir.go b/weed-fs/src/cmd/gdir.go deleted file mode 100644 index 34c5733be..000000000 --- a/weed-fs/src/cmd/gdir.go +++ /dev/null @@ -1,18 +0,0 @@ -package main - -import ( - "directory" - // "runtime" - "log" -) - -func main() { - m := directory.NewMapper("/tmp", "directory") - log.Println("map size", len(m.Virtual2physical)) - m.Add(10, 11,12,13) - m.Add(20, 21,22,23) - log.Println("map(10)", m.Get(10)) - log.Println("map size", len(m.Virtual2physical)) - m.Save() - defer m.Save() -} diff --git a/weed-fs/src/cmd/gstore.go b/weed-fs/src/cmd/gstore.go index 8bd240902..a86e30373 100644 --- a/weed-fs/src/cmd/gstore.go +++ b/weed-fs/src/cmd/gstore.go @@ -6,9 +6,11 @@ import ( "flag" "fmt" "http" + "json" "log" "mime" "os" + "rand" "strconv" "strings" ) @@ -18,6 +20,10 @@ var ( chunkFolder = flag.String("dir", "/tmp", "data directory to store files") chunkCount = flag.Int("chunks", 5, "data chunks to store files") chunkEnabled = flag.Bool("data", false, "act as a store server") + chunkServer = flag.String("cserver", "localhost:8080", "chunk server to store data") + publicServer = flag.String("pserver", "localhost:8080", "public server to serve data read") + metaServer = flag.String("mserver", "localhost:8080", "metadata server to store mappings") + metaEnabled = flag.Bool("meta", false, "act as a directory server") metaFolder = flag.String("mdir", "/tmp", "data directory to store mappings") ) @@ -41,7 +47,7 @@ func (s *Haystack) GetHandler(w http.ResponseWriter, r *http.Request) { n := new(store.Needle) path := r.URL.Path sepIndex := strings.Index(path[1:], "/") + 1 - volumeId, _ := strconv.Atoi(path[1:sepIndex]) + volumeId, _ := strconv.Atoui64(path[1:sepIndex]) dotIndex := strings.LastIndex(path, ".") n.ParsePath(path[sepIndex+1 : dotIndex]) ext := path[dotIndex:] @@ -51,14 +57,47 @@ func (s *Haystack) GetHandler(w http.ResponseWriter, r *http.Request) { w.Write(n.Data) } func (s *Haystack) PostHandler(w http.ResponseWriter, r *http.Request) { - volumeId := s.store.Write(store.NewNeedle(r)) + volumeId, _ := strconv.Atoui64(r.FormValue("volumeId")) + s.store.Write(volumeId, store.NewNeedle(r)) w.Header().Set("Content-Type", "text/plain") fmt.Fprint(w, "volumeId=", volumeId, "\n") } func (s *Haystack) DeleteHandler(w http.ResponseWriter, r *http.Request) { } -func directoryHandler(w http.ResponseWriter, r *http.Request) { +func dirReadHandler(w http.ResponseWriter, r *http.Request) { + volumeId, _ := strconv.Atoui64(r.FormValue("volumeId")) + machineList := server.directory.Get((uint32)(volumeId)) + x := rand.Intn(len(machineList)) + machine := machineList[x] + bytes, _ := json.Marshal(machine) + callback := r.FormValue("callback") + w.Header().Set("Content-Type", "application/javascript") + if callback == "" { + w.Write(bytes) + } else { + w.Write([]uint8(callback)) + w.Write([]uint8("(")) + w.Write(bytes) + w.Write([]uint8(")")) + } +} +func dirWriteHandler(w http.ResponseWriter, r *http.Request) { + machineList := server.directory.PickForWrite() + bytes, _ := json.Marshal(machineList) + callback := r.FormValue("callback") + w.Header().Set("Content-Type", "application/javascript") + if callback == "" { + w.Write(bytes) + } else { + w.Write([]uint8(callback)) + w.Write([]uint8("(")) + w.Write(bytes) + w.Write([]uint8(")")) + } +} +func dirJoinHandler(w http.ResponseWriter, r *http.Request) { + } var server *Haystack @@ -66,22 +105,24 @@ var server *Haystack func main() { flag.Parse() if !*chunkEnabled && !*metaEnabled { - fmt.Fprintf(os.Stderr, "Need to act as either a store server or a directory server, or both\n") - flag.PrintDefaults() - os.Exit(-1) + fmt.Fprintf(os.Stdout, "Act as both a store server and a directory server\n") } server = new(Haystack) if *chunkEnabled { fmt.Fprintf(os.Stdout, "Chunk data stored in %s\n", *chunkFolder) - server.store = store.NewStore(*chunkFolder, *chunkCount) + server.store = store.NewStore(*chunkServer, *publicServer, *chunkFolder) defer server.store.Close() http.HandleFunc("/", storeHandler) } if *metaEnabled { server.directory = directory.NewMapper(*metaFolder, "directory") defer server.directory.Save() - http.HandleFunc("/directory", directoryHandler) + http.HandleFunc("/dir/read", dirReadHandler) + http.HandleFunc("/dir/write", dirWriteHandler) + http.HandleFunc("/dir/join", dirJoinHandler) } + + server.store.Join(*metaServer) log.Println("Serving at http://127.0.0.1:" + strconv.Itoa(*port)) http.ListenAndServe(":"+strconv.Itoa(*port), nil) diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go index 60fef7d7d..963145153 100644 --- a/weed-fs/src/pkg/directory/volume_mapping.go +++ b/weed-fs/src/pkg/directory/volume_mapping.go @@ -3,13 +3,17 @@ package directory import ( "gob" "os" + "rand" "log" ) +type Machine struct { + Server string //[:port] +} type Mapper struct { dir string fileName string - Virtual2physical map[uint32][]uint32 + Virtual2physical map[uint32][]Machine } func NewMapper(dirname string, filename string) (m *Mapper) { @@ -21,17 +25,21 @@ func NewMapper(dirname string, filename string) (m *Mapper) { if e != nil { log.Fatalf("Mapping File Read [ERROR] %s\n", e) } else { - m.Virtual2physical = make(map[uint32][]uint32) + m.Virtual2physical = make(map[uint32][]Machine) decoder := gob.NewDecoder(dataFile) decoder.Decode(m.Virtual2physical) dataFile.Close() } return } -func (m *Mapper) Get(vid uint32) []uint32 { +func (m *Mapper) PickForWrite() []Machine { + vid := uint32(rand.Intn(len(m.Virtual2physical))) + return m.Virtual2physical[vid] +} +func (m *Mapper) Get(vid uint32) []Machine { return m.Virtual2physical[vid] } -func (m *Mapper) Add(vid uint32, pids ...uint32) { +func (m *Mapper) Add(vid uint32, pids ...Machine) { m.Virtual2physical[vid] = append(m.Virtual2physical[vid], pids...) } func (m *Mapper) Save() { @@ -41,7 +49,7 @@ func (m *Mapper) Save() { log.Fatalf("Mapping File Save [ERROR] %s\n", e) } defer dataFile.Close() - m.Virtual2physical = make(map[uint32][]uint32) + m.Virtual2physical = make(map[uint32][]Machine) encoder := gob.NewEncoder(dataFile) encoder.Encode(m.Virtual2physical) } diff --git a/weed-fs/src/pkg/store/store.go b/weed-fs/src/pkg/store/store.go index a568fb199..c287aaaa8 100644 --- a/weed-fs/src/pkg/store/store.go +++ b/weed-fs/src/pkg/store/store.go @@ -1,39 +1,69 @@ package store import ( - "log" - "strconv" + "log" + "io/ioutil" + "json" + "strings" + "strconv" + "url" ) -type Store struct{ - volumes []*Volume - dir string - - freeVolumeChannel chan int + +type Store struct { + volumes map[uint64]*Volume + dir string + Server string + PublicServer string +} +type VolumeStat struct { + Id uint64 "id" + Status int "status" //0:read, 1:write +} + +func NewStore(server, publicServer, dirname string) (s *Store) { + s = new(Store) + s.Server, s.PublicServer, s.dir = server, publicServer, dirname + s.volumes = make(map[uint64]*Volume) + + counter := uint64(0) + files, _ := ioutil.ReadDir(dirname) + for _, f := range files { + if f.IsDirectory() || !strings.HasSuffix(f.Name, ".dat") { + continue + } + id, err := strconv.Atoui64(f.Name[:-4]) + if err == nil { + continue + } + s.volumes[counter] = NewVolume(s.dir, id) + counter++ + } + log.Println("Store started on dir:", dirname, "with", counter, "existing volumes") + return } -func NewStore(dirname string, count int) (s *Store){ - s = new(Store) - s.dir = dirname - s.volumes = make([]*Volume,count) - s.freeVolumeChannel = make(chan int, count) - for i:=0;i0;i-- { v += uint64(b[i]) @@ -26,3 +33,18 @@ func uint32toBytes(b []byte, v uint32){ b[i] = byte(v>>(i*8)) } } + +func post(url string, values url.Values)string{ + r, err := http.PostForm(url, values) + if err != nil { + log.Println("post:", err) + return "" + } + defer r.Body.Close() + b, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Println("post:", err) + return "" + } + return string(b) +} \ No newline at end of file diff --git a/weed-fs/src/pkg/store/volume.go b/weed-fs/src/pkg/store/volume.go index 33b010e62..c32f4af10 100644 --- a/weed-fs/src/pkg/store/volume.go +++ b/weed-fs/src/pkg/store/volume.go @@ -2,29 +2,32 @@ package store import ( "os" + "path" + "strconv" "log" ) type Volume struct { + Id uint64 dir string - fileName string dataFile, indexFile *os.File nm *NeedleMap accessChannel chan int } -func NewVolume(dirname string, filename string) (v *Volume) { +func NewVolume(dirname string, id uint64) (v *Volume) { var e os.Error v = new(Volume) v.dir = dirname - v.fileName = filename - log.Println("file", v.dir, "/", v.fileName) - v.dataFile, e = os.OpenFile(v.dir+string(os.PathSeparator)+v.fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) + v.Id = id + fileName := strconv.Uitoa64(v.Id) + log.Println("file", v.dir, "/", fileName) + v.dataFile, e = os.OpenFile(path.Join(v.dir,fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644) if e != nil { log.Fatalf("New Volume [ERROR] %s\n", e) } - v.indexFile, e = os.OpenFile(v.dir+string(os.PathSeparator)+v.fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) + v.indexFile, e = os.OpenFile(path.Join(v.dir,fileName+".idx"), os.O_RDWR|os.O_CREATE, 0644) if e != nil { log.Fatalf("New Volume [ERROR] %s\n", e) }