diff --git a/weed-fs/src/cmd/weedc.go b/weed-fs/src/cmd/weedc.go index 94032a971..00e750e02 100644 --- a/weed-fs/src/cmd/weedc.go +++ b/weed-fs/src/cmd/weedc.go @@ -57,7 +57,8 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) { func main() { flag.Parse() - store = storage.NewStore(*port, *publicServer, *chunkFolder) + //TODO: now default to 1G, this value should come from server? + store = storage.NewStore(*port, *publicServer, *chunkFolder, 1024*1024*1024, *chunkCount) defer store.Close() http.HandleFunc("/", storeHandler) diff --git a/weed-fs/src/cmd/weeds.go b/weed-fs/src/cmd/weeds.go index ad423b71e..b888c6c29 100644 --- a/weed-fs/src/cmd/weeds.go +++ b/weed-fs/src/cmd/weeds.go @@ -10,17 +10,19 @@ import ( "log" "rand" "strconv" + "strings" ) var ( - port = flag.Int("port", 9333, "http listen port") - metaFolder = flag.String("mdir", "/tmp", "data directory to store mappings") - mapper *directory.Mapper + port = flag.Int("port", 9333, "http listen port") + metaFolder = flag.String("mdir", "/tmp", "data directory to store mappings") + capacity = flag.Int("capacity", 100, "maximum number of volumes to hold") + mapper *directory.Mapper ) func dirReadHandler(w http.ResponseWriter, r *http.Request) { - volumeId, _ := strconv.Atoui64(r.FormValue("volumeId")) - machineList := mapper.Get((uint32)(volumeId)) + volumeId, _ := strconv.Atoi(r.FormValue("volumeId")) + machineList := mapper.Get(volumeId) x := rand.Intn(len(machineList)) machine := machineList[x] bytes, _ := json.Marshal(machine) @@ -37,9 +39,27 @@ func dirReadHandler(w http.ResponseWriter, r *http.Request) { } func dirWriteHandler(w http.ResponseWriter, r *http.Request) { machineList := mapper.PickForWrite() - bytes, _ := json.Marshal(machineList) - callback := r.FormValue("callback") + writeJson(w, r, machineList) +} +func dirJoinHandler(w http.ResponseWriter, r *http.Request) { + s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") + publicServer := r.FormValue("publicServer") + volumes := new([]storage.VolumeStat) + json.Unmarshal([]byte(r.FormValue("volumes")), volumes) + capacity, _ := strconv.Atoi(r.FormValue("capacity")) + log.Println("Recieved joining request from remote address", s, "capacity=", capacity, "volumes", r.FormValue("volumes")) + vids := mapper.Add(*directory.NewMachine(s, publicServer), *volumes, capacity) + writeJson(w, r, vids) +} +func dirStatusHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain") + bytes, _ := json.Marshal(mapper) + fmt.Fprint(w, bytes) +} +func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) { w.Header().Set("Content-Type", "application/javascript") + bytes, _ := json.Marshal(obj) + callback := r.FormValue("callback") if callback == "" { w.Write(bytes) } else { @@ -48,33 +68,22 @@ func dirWriteHandler(w http.ResponseWriter, r *http.Request) { w.Write(bytes) w.Write([]uint8(")")) } -} -func dirJoinHandler(w http.ResponseWriter, r *http.Request) { - s := r.FormValue("server") - publicServer := r.FormValue("publicServer") - volumes := make([]storage.VolumeStat, 0) - json.Unmarshal([]byte(r.FormValue("volumes")), volumes) - mapper.Add(directory.NewMachine(s, publicServer), volumes) -} -func dirStatusHandler(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "text/plain") - bytes, _ := json.Marshal(mapper) - fmt.Fprint(w, bytes) + log.Println("JSON Response", string(bytes)) } func main() { flag.Parse() - mapper = directory.NewMapper(*metaFolder, "directory") + mapper = directory.NewMapper(*metaFolder, "directory", *capacity) defer mapper.Save() http.HandleFunc("/dir/read", dirReadHandler) http.HandleFunc("/dir/write", dirWriteHandler) http.HandleFunc("/dir/join", dirJoinHandler) - http.HandleFunc("/dir/status", dirStatusHandler) + http.HandleFunc("/dir/status", dirStatusHandler) log.Println("Start directory service at http://127.0.0.1:" + strconv.Itoa(*port)) e := http.ListenAndServe(":"+strconv.Itoa(*port), nil) - if e!=nil { - log.Fatalf("Fail to start:",e.String()) - } + if e != nil { + log.Fatalf("Fail to start:", e.String()) + } } diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go index 459d7f2b1..ffc90ec7d 100644 --- a/weed-fs/src/pkg/directory/volume_mapping.go +++ b/weed-fs/src/pkg/directory/volume_mapping.go @@ -12,12 +12,14 @@ import ( type Machine struct { Server string //[:port] PublicServer string + CanWrite bool } type Mapper struct { - dir string - FileName string - Id2Machine map[uint32][]*Machine - LastId uint32 + dir string + fileName string + capacity int + Machines [][]Machine //initial version only support one copy per machine + writers [][]Machine // transient value to lookup writers fast } func NewMachine(server, publicServer string) (m *Machine) { @@ -26,57 +28,88 @@ func NewMachine(server, publicServer string) (m *Machine) { return } -func NewMapper(dirname string, filename string) (m *Mapper) { +func NewMapper(dirname string, filename string, capacity int) (m *Mapper) { m = new(Mapper) - m.dir = dirname - m.FileName = filename - log.Println("Loading virtual to physical:", path.Join(m.dir,m.FileName+".map")) - dataFile, e := os.OpenFile(path.Join(m.dir,m.FileName+".map"), os.O_RDONLY, 0644) - m.Id2Machine = make(map[uint32][]*Machine) + m.dir, m.fileName, m.capacity = dirname, filename, capacity + log.Println("Loading volume id to maching mapping:", path.Join(m.dir, m.fileName+".map")) + dataFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".map"), os.O_RDONLY, 0644) + m.Machines = *new([][]Machine) + m.writers = *new([][]Machine) if e != nil { log.Println("Mapping File Read", e) } else { decoder := gob.NewDecoder(dataFile) - decoder.Decode(m.LastId) - decoder.Decode(m.Id2Machine) + decoder.Decode(m.Machines) + for _, list := range m.Machines { + //TODO: what if a list has mixed readers and writers? Now it's treated as readonly + allCanWrite := false + for _, entry := range list { + allCanWrite = allCanWrite && entry.CanWrite + } + if allCanWrite { + m.writers = append(m.writers, list) + } + } dataFile.Close() + log.Println("Loaded mapping size", len(m.Machines)) } return } -func (m *Mapper) PickForWrite() []*Machine { - vid := uint32(rand.Intn(len(m.Id2Machine))) - return m.Id2Machine[vid] +func (m *Mapper) PickForWrite() []Machine { + vid := rand.Intn(len(m.Machines)) + return m.Machines[vid] } -func (m *Mapper) Get(vid uint32) []*Machine { - return m.Id2Machine[vid] +func (m *Mapper) Get(vid int) []Machine { + return m.Machines[vid] } -func (m *Mapper) Add(machine *Machine, volumes []storage.VolumeStat) { - log.Println("Adding store node", machine.Server) +func (m *Mapper) Add(machine Machine, volumes []storage.VolumeStat, capacity int) []int { + log.Println("Adding existing", machine.Server, len(volumes), "volumes to dir", len(m.Machines)) + log.Println("Adding new ", machine.Server, capacity - len(volumes), "volumes to dir", len(m.Machines)) + maxId := len(m.Machines)-1 + for _, v := range volumes { + if maxId < int(v.Id) { + maxId = int(v.Id) + } + } + for i := len(m.Machines); i <= maxId; i++ { + m.Machines = append(m.Machines, nil) + } + log.Println("Machine list now is", len(m.Machines)) for _, v := range volumes { - existing := m.Id2Machine[uint32(v.Id)] found := false + existing := m.Machines[v.Id] for _, entry := range existing { - if machine == entry { + if machine.Server == entry.Server { found = true break } } if !found { - m.Id2Machine[uint32(v.Id)] = append(existing, machine) + m.Machines[v.Id] = append(existing, machine) + log.Println("Setting volume", v.Id, "to", machine.Server) } - log.Println(v.Id, "=>", machine.Server) } + + vids := new([]int) + for vid,i := len(m.Machines),len(volumes); i < capacity; i,vid=i+1,vid+1 { + list := new([]Machine) + *list = append(*list, machine) + m.Machines = append(m.Machines, *list) + log.Println("Adding volume", vid, "from", machine.Server) + *vids = append(*vids, vid) + } + m.Save() + log.Println("Dir size =>", len(m.Machines)) + return *vids } func (m *Mapper) Save() { - log.Println("Saving virtual to physical:", path.Join(m.dir,m.FileName+".map")) - dataFile, e := os.OpenFile(path.Join(m.dir,m.FileName+".map"), os.O_CREATE|os.O_WRONLY, 0644) + log.Println("Saving virtual to physical:", path.Join(m.dir, m.fileName+".map")) + dataFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".map"), os.O_CREATE|os.O_WRONLY, 0644) if e != nil { log.Fatalf("Mapping File Save [ERROR] %s\n", e) } defer dataFile.Close() - m.Id2Machine = make(map[uint32][]*Machine) encoder := gob.NewEncoder(dataFile) - encoder.Encode(m.LastId) - encoder.Encode(m.Id2Machine) + encoder.Encode(m.Machines) } diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index 201f50528..e13f47257 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -11,50 +11,64 @@ import ( type Store struct { volumes map[uint64]*Volume + capacity int dir string - Port int + Port int PublicServer string } type VolumeStat struct { - Id uint64 "id" - Status int "status" //0:read, 1:write + Id uint64 "id" + CanWrite bool } -func NewStore(port int, publicServer, dirname string) (s *Store) { +func NewStore(port int, publicServer, dirname string, chunkSize, capacity int) (s *Store) { s = new(Store) - s.Port, s.PublicServer, s.dir = port, publicServer, dirname + s.Port, s.PublicServer, s.dir, s.capacity = port, publicServer, dirname, capacity s.volumes = make(map[uint64]*Volume) - counter := uint64(0) files, _ := ioutil.ReadDir(dirname) for _, f := range files { if f.IsDirectory() || !strings.HasSuffix(f.Name, ".dat") { continue } - id, err := strconv.Atoui64(f.Name[:-4]) - if err == nil { + id, err := strconv.Atoui64(f.Name[0:(strings.LastIndex(f.Name, ".dat"))]) + log.Println("Loading data file name:", f.Name) + if err != nil { continue } - s.volumes[counter] = NewVolume(s.dir, id) - counter++ + s.volumes[id] = NewVolume(s.dir, id) } - log.Println("Store started on dir:", dirname, "with", counter, "existing volumes") + log.Println("Store started on dir:", dirname, "with", len(s.volumes), "existing volumes") + log.Println("Expected capacity=", s.capacity, "volumes") return } func (s *Store) Join(mserver string) { - stats := make([]*VolumeStat, len(s.volumes)) + stats := new([]*VolumeStat) for k, _ := range s.volumes { s := new(VolumeStat) - s.Id, s.Status = k, 1 - stats = append(stats, s) + s.Id, s.CanWrite = k, true + *stats = append(*stats, s) } bytes, _ := json.Marshal(stats) values := make(url.Values) values.Add("port", strconv.Itoa(s.Port)) - values.Add("publicServer", s.PublicServer) + values.Add("publicServer", s.PublicServer) values.Add("volumes", string(bytes)) - post("http://"+mserver+"/dir/join", values) + log.Println("Registering exiting volumes", string(bytes)) + values.Add("capacity", strconv.Itoa(s.capacity)) + retString := post("http://"+mserver+"/dir/join", values) + if retString != nil { + newVids := new([]int) + log.Println("Instructed to create volume",string(retString)) + e := json.Unmarshal(retString, newVids) + if e == nil { + for _, vid := range *newVids { + s.volumes[uint64(vid)] = NewVolume(s.dir, uint64(vid)) + log.Println("Adding volume", vid) + } + } + } } func (s *Store) Close() { for _, v := range s.volumes { diff --git a/weed-fs/src/pkg/storage/util.go b/weed-fs/src/pkg/storage/util.go index 3315dbe01..58a415bc0 100644 --- a/weed-fs/src/pkg/storage/util.go +++ b/weed-fs/src/pkg/storage/util.go @@ -34,17 +34,17 @@ func uint32toBytes(b []byte, v uint32){ } } -func post(url string, values url.Values)string{ +func post(url string, values url.Values)[]byte{ r, err := http.PostForm(url, values) if err != nil { log.Println("post:", err) - return "" + return nil } defer r.Body.Close() b, err := ioutil.ReadAll(r.Body) if err != nil { log.Println("post:", err) - return "" + return nil } - return string(b) + return b } \ No newline at end of file diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index 6b6db1ba2..3acd2d0e3 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -22,7 +22,6 @@ func NewVolume(dirname string, id uint64) (v *Volume) { v.dir = dirname v.Id = id fileName := strconv.Uitoa64(v.Id) - log.Println("file", v.dir, "/", fileName) v.dataFile, e = os.OpenFile(path.Join(v.dir,fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644) if e != nil { log.Fatalf("New Volume [ERROR] %s\n", e) @@ -39,6 +38,13 @@ func NewVolume(dirname string, id uint64) (v *Volume) { return } +func (v *Volume) CanWrite(limit int64) bool { + stat, e:=v.dataFile.Stat() + if e!=nil{ + return stat.Size < limit + } + return false +} func (v *Volume) Close() { close(v.accessChannel) v.dataFile.Close()