diff --git a/weed-fs/note/startup_process.txt b/weed-fs/note/startup_process.txt new file mode 100644 index 000000000..558955005 --- /dev/null +++ b/weed-fs/note/startup_process.txt @@ -0,0 +1,7 @@ +1. clients report its own server info, volumes info, +2. master collect all volumes, separated into readable volumes, writable volumes, volume2machine mapping + machines is an array of machine info + writable volumes is an array of vids + vid2machineId maps volume id to machineId, which is the index of machines array + + diff --git a/weed-fs/src/cmd/weeds.go b/weed-fs/src/cmd/weeds.go index b888c6c29..c639d48d3 100644 --- a/weed-fs/src/cmd/weeds.go +++ b/weed-fs/src/cmd/weeds.go @@ -8,7 +8,6 @@ import ( "http" "json" "log" - "rand" "strconv" "strings" ) @@ -22,20 +21,8 @@ var ( func dirReadHandler(w http.ResponseWriter, r *http.Request) { volumeId, _ := strconv.Atoi(r.FormValue("volumeId")) - machineList := mapper.Get(volumeId) - x := rand.Intn(len(machineList)) - machine := machineList[x] - bytes, _ := json.Marshal(machine) - callback := r.FormValue("callback") - w.Header().Set("Content-Type", "application/javascript") - if callback == "" { - w.Write(bytes) - } else { - w.Write([]uint8(callback)) - w.Write([]uint8("(")) - w.Write(bytes) - w.Write([]uint8(")")) - } + machine := mapper.Get(volumeId) + writeJson(w, r, machine) } func dirWriteHandler(w http.ResponseWriter, r *http.Request) { machineList := mapper.PickForWrite() @@ -44,17 +31,17 @@ func dirWriteHandler(w http.ResponseWriter, r *http.Request) { func dirJoinHandler(w http.ResponseWriter, r *http.Request) { s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") publicServer := r.FormValue("publicServer") - volumes := new([]storage.VolumeStat) + volumes := new([]storage.VolumeInfo) json.Unmarshal([]byte(r.FormValue("volumes")), volumes) capacity, _ := strconv.Atoi(r.FormValue("capacity")) log.Println("Recieved joining request from remote address", s, "capacity=", capacity, "volumes", r.FormValue("volumes")) - vids := mapper.Add(*directory.NewMachine(s, publicServer), *volumes, capacity) + vids := mapper.Add(*directory.NewMachine(s, publicServer, *volumes, capacity)) writeJson(w, r, vids) } func dirStatusHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain") bytes, _ := json.Marshal(mapper) - fmt.Fprint(w, bytes) + fmt.Fprint(w, string(bytes)) } func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) { w.Header().Set("Content-Type", "application/javascript") @@ -65,7 +52,7 @@ func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) { } else { w.Write([]uint8(callback)) w.Write([]uint8("(")) - w.Write(bytes) + fmt.Fprint(w, string(bytes)) w.Write([]uint8(")")) } log.Println("JSON Response", string(bytes)) diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go index e9a0647e5..4062b3109 100644 --- a/weed-fs/src/pkg/directory/volume_mapping.go +++ b/weed-fs/src/pkg/directory/volume_mapping.go @@ -7,24 +7,36 @@ import ( "rand" "log" "storage" + "sync" +) + +const ( + ChunkSizeLimit = 1 * 1024 * 1024 * 1024 //1G, can not be more than max(uint32)*8 ) type Machine struct { Server string //[:port] PublicServer string - CanWrite bool + Volumes []storage.VolumeInfo + Capacity int } + type Mapper struct { dir string fileName string capacity int - Machines [][]Machine //initial version only support one copy per machine - writers [][]Machine // transient value to lookup writers fast + + lock sync.Mutex + Machines []*Machine + vid2machineId map[uint64]int + writers []int // transient array of writers volume id + + GlobalVolumeSequence uint64 } -func NewMachine(server, publicServer string) (m *Machine) { +func NewMachine(server, publicServer string, volumes []storage.VolumeInfo, capacity int) (m *Machine) { m = new(Machine) - m.Server, m.PublicServer = server, publicServer + m.Server, m.PublicServer, m.Volumes, m.Capacity = server, publicServer, volumes, capacity return } @@ -33,74 +45,90 @@ func NewMapper(dirname string, filename string, capacity int) (m *Mapper) { m.dir, m.fileName, m.capacity = dirname, filename, capacity log.Println("Loading volume id to maching mapping:", path.Join(m.dir, m.fileName+".map")) dataFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".map"), os.O_RDONLY, 0644) - m.Machines = *new([][]Machine) - m.writers = *new([][]Machine) + m.vid2machineId = make(map[uint64]int) + m.writers = *new([]int) if e != nil { log.Println("Mapping File Read", e) + m.Machines = *new([]*Machine) } else { decoder := gob.NewDecoder(dataFile) - defer dataFile.Close() + defer dataFile.Close() decoder.Decode(&m.Machines) - for _, list := range m.Machines { - //TODO: what if a list has mixed readers and writers? Now it's treated as readonly - allCanWrite := false - for _, entry := range list { - allCanWrite = allCanWrite && entry.CanWrite - } - if allCanWrite { - m.writers = append(m.writers, list) + decoder.Decode(&m.GlobalVolumeSequence) + + //add to vid2machineId map, and writers array + for machine_index, machine := range m.Machines { + for _, v := range machine.Volumes { + m.vid2machineId[v.Id] = machine_index + if v.Size < ChunkSizeLimit { + m.writers = append(m.writers, machine_index) + } } } - log.Println("Loaded mapping size", len(m.Machines)) + log.Println("Loaded mapping size", len(m.Machines)) } return } -func (m *Mapper) PickForWrite() []Machine { - vid := rand.Intn(len(m.Machines)) - return m.Machines[vid] +func (m *Mapper) PickForWrite() *Machine { + vid := rand.Intn(len(m.writers)) + return m.Machines[m.writers[vid]] } -func (m *Mapper) Get(vid int) []Machine { +func (m *Mapper) Get(vid int) *Machine { return m.Machines[vid] } -func (m *Mapper) Add(machine Machine, volumes []storage.VolumeStat, capacity int) []int { - log.Println("Adding existing", machine.Server, len(volumes), "volumes to dir", len(m.Machines)) - log.Println("Adding new ", machine.Server, capacity - len(volumes), "volumes to dir", len(m.Machines)) - maxId := len(m.Machines)-1 - for _, v := range volumes { - if maxId < int(v.Id) { - maxId = int(v.Id) +func (m *Mapper) Add(machine Machine) []uint64 { + log.Println("Adding existing", machine.Server, len(machine.Volumes), "volumes to dir", len(m.Machines)) + log.Println("Adding new ", machine.Server, machine.Capacity-len(machine.Volumes), "volumes to dir", len(m.Machines)) + //check existing machine, linearly + m.lock.Lock() + foundExistingMachineId := -1 + for index, entry := range m.Machines { + if machine.Server == entry.Server { + foundExistingMachineId = index + break } } - for i := len(m.Machines); i <= maxId; i++ { - m.Machines = append(m.Machines, nil) - } - log.Println("Machine list now is", len(m.Machines)) - for _, v := range volumes { - found := false - existing := m.Machines[v.Id] - for _, entry := range existing { - if machine.Server == entry.Server { - found = true - break - } - } - if !found { - m.Machines[v.Id] = append(existing, machine) - log.Println("Setting volume", v.Id, "to", machine.Server) - } + machineId := foundExistingMachineId + if machineId < 0 { + machineId = len(m.Machines) + m.Machines = append(m.Machines, &machine) } - vids := new([]int) - for vid,i := len(m.Machines),len(volumes); i < capacity; i,vid=i+1,vid+1 { - list := new([]Machine) - *list = append(*list, machine) - m.Machines = append(m.Machines, *list) + //generate new volumes + vids := new([]uint64) + for vid, i := m.GlobalVolumeSequence, len(machine.Volumes); i < machine.Capacity; i, vid = i+1, vid+1 { + newVolume := *new(storage.VolumeInfo) + newVolume.Id, newVolume.Size = vid, 0 + machine.Volumes = append(machine.Volumes, newVolume) + m.vid2machineId[vid] = machineId log.Println("Adding volume", vid, "from", machine.Server) *vids = append(*vids, vid) + m.GlobalVolumeSequence = vid + 1 } m.Save() - log.Println("Dir size =>", len(m.Machines)) + m.lock.Unlock() + + //add to vid2machineId map, and writers array + for _, v := range machine.Volumes { + log.Println("Setting volume", v.Id, "to", machine.Server) + m.vid2machineId[v.Id] = machineId + if v.Size < ChunkSizeLimit { + m.writers = append(m.writers, machineId) + } + } + //setting writers, copy-on-write because of possible updating + var writers []int + for machine_index, machine_entry := range m.Machines { + for _, v := range machine_entry.Volumes { + if v.Size < ChunkSizeLimit { + writers = append(writers, machine_index) + } + } + } + m.writers = writers + + log.Println("Machines:", len(m.Machines), "Volumes:", len(m.vid2machineId), "Writable:", len(m.writers)) return *vids } func (m *Mapper) Save() { @@ -112,4 +140,5 @@ func (m *Mapper) Save() { defer dataFile.Close() encoder := gob.NewEncoder(dataFile) encoder.Encode(m.Machines) + encoder.Encode(m.GlobalVolumeSequence) } diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index e13f47257..ff6944853 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -16,9 +16,9 @@ type Store struct { Port int PublicServer string } -type VolumeStat struct { - Id uint64 "id" - CanWrite bool +type VolumeInfo struct { + Id uint64 + Size int64 } func NewStore(port int, publicServer, dirname string, chunkSize, capacity int) (s *Store) { @@ -44,10 +44,10 @@ func NewStore(port int, publicServer, dirname string, chunkSize, capacity int) ( } func (s *Store) Join(mserver string) { - stats := new([]*VolumeStat) - for k, _ := range s.volumes { - s := new(VolumeStat) - s.Id, s.CanWrite = k, true + stats := new([]*VolumeInfo) + for k, v := range s.volumes { + s := new(VolumeInfo) + s.Id, s.Size = k, v.Size() *stats = append(*stats, s) } bytes, _ := json.Marshal(stats) diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index 3acd2d0e3..4b7f6ab3a 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -38,12 +38,12 @@ func NewVolume(dirname string, id uint64) (v *Volume) { return } -func (v *Volume) CanWrite(limit int64) bool { +func (v *Volume) Size() int64 { stat, e:=v.dataFile.Stat() if e!=nil{ - return stat.Size < limit + return stat.Size } - return false + return -1 } func (v *Volume) Close() { close(v.accessChannel)