diff --git a/weed-fs/src/cmd/weeds.go b/weed-fs/src/cmd/weeds.go index ba134380c..4b3792687 100644 --- a/weed-fs/src/cmd/weeds.go +++ b/weed-fs/src/cmd/weeds.go @@ -22,11 +22,15 @@ var ( func dirReadHandler(w http.ResponseWriter, r *http.Request) { volumeId, _ := strconv.Atoui64(r.FormValue("volumeId")) machine := mapper.Get(volumeId) - writeJson(w, r, machine) + writeJson(w, r, machine.Server) } func dirWriteHandler(w http.ResponseWriter, r *http.Request) { - machineList := mapper.PickForWrite() - writeJson(w, r, machineList) + machine := mapper.PickForWrite() + writeJson(w, r, machine) +} +func dirPickHandler(w http.ResponseWriter, r *http.Request) { + machine := mapper.PickForWrite() + writeJson(w, r, machine) } func dirJoinHandler(w http.ResponseWriter, r *http.Request) { s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") @@ -64,6 +68,7 @@ func main() { defer mapper.Save() http.HandleFunc("/dir/read", dirReadHandler) http.HandleFunc("/dir/write", dirWriteHandler) + http.HandleFunc("/dir/pick", dirPickHandler) http.HandleFunc("/dir/join", dirJoinHandler) http.HandleFunc("/dir/status", dirStatusHandler) diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go index b24b1747e..ff5b0bf64 100644 --- a/weed-fs/src/pkg/directory/volume_mapping.go +++ b/weed-fs/src/pkg/directory/volume_mapping.go @@ -15,15 +15,13 @@ const ( ) type MachineInfo struct { - Server string //[:port] - PublicServer string + Url string //[:port] + PublicUrl string } type Machine struct { - MachineInfo - Server string //[:port] - PublicServer string - Volumes []storage.VolumeInfo - Capacity int + Server MachineInfo + Volumes []storage.VolumeInfo + Capacity int } type Mapper struct { @@ -39,19 +37,16 @@ type Mapper struct { GlobalVolumeSequence uint64 } -func NewMachine(server, publicServer string, volumes []storage.VolumeInfo, capacity int) (m *Machine) { - m = new(Machine) - m.Server, m.PublicServer, m.Volumes, m.Capacity = server, publicServer, volumes, capacity - return +func NewMachine(server, publicServer string, volumes []storage.VolumeInfo, capacity int) *Machine { + return &Machine{Server:MachineInfo{Url:server,PublicUrl:publicServer},Volumes:volumes,Capacity:capacity} } func NewMapper(dirname string, filename string, capacity int) (m *Mapper) { - m = new(Mapper) - m.dir, m.fileName, m.capacity = dirname, filename, capacity + m = &Mapper{dir:dirname,fileName:filename,capacity:capacity} log.Println("Loading volume id to maching mapping:", path.Join(m.dir, m.fileName+".map")) dataFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".map"), os.O_RDONLY, 0644) - m.vid2machineId = make(map[uint64]int) - m.Writers = *new([]int) + m.vid2machineId = make(map[uint64]int) + m.Writers = *new([]int) if e != nil { log.Println("Mapping File Read", e) m.Machines = *new([]*Machine) @@ -64,7 +59,7 @@ func NewMapper(dirname string, filename string, capacity int) (m *Mapper) { //add to vid2machineId map, and Writers array for machine_index, machine := range m.Machines { for _, v := range machine.Volumes { - m.vid2machineId[v.Id] = machine_index + m.vid2machineId[v.Id] = machine_index if v.Size < ChunkSizeLimit { m.Writers = append(m.Writers, machine_index) } @@ -74,24 +69,21 @@ func NewMapper(dirname string, filename string, capacity int) (m *Mapper) { } return } -func (m *Mapper) PickForWrite() map[string]string { +func (m *Mapper) PickForWrite() MachineInfo { vid := rand.Intn(len(m.Writers)) - return map[string]string{ - "server":m.Machines[m.Writers[vid]].Server, - "url":m.Machines[m.Writers[vid]].PublicServer, - } + return m.Machines[m.Writers[vid]].Server } func (m *Mapper) Get(vid uint64) *Machine { return m.Machines[m.vid2machineId[vid]] } func (m *Mapper) Add(machine Machine) []uint64 { - log.Println("Adding existing", machine.Server, len(machine.Volumes), "volumes to dir", len(m.Machines)) - log.Println("Adding new ", machine.Server, machine.Capacity-len(machine.Volumes), "volumes to dir", len(m.Machines)) + log.Println("Adding existing", machine.Server.Url, len(machine.Volumes), "volumes to dir", len(m.Machines)) + log.Println("Adding new ", machine.Server.Url, machine.Capacity-len(machine.Volumes), "volumes to dir", len(m.Machines)) //check existing machine, linearly m.lock.Lock() foundExistingMachineId := -1 for index, entry := range m.Machines { - if machine.Server == entry.Server { + if machine.Server.Url == entry.Server.Url { foundExistingMachineId = index break } @@ -105,11 +97,10 @@ func (m *Mapper) Add(machine Machine) []uint64 { //generate new volumes vids := new([]uint64) for vid, i := m.GlobalVolumeSequence, len(machine.Volumes); i < machine.Capacity; i, vid = i+1, vid+1 { - newVolume := *new(storage.VolumeInfo) - newVolume.Id, newVolume.Size = vid, 0 + newVolume := storage.VolumeInfo{Id: vid, Size: 0} machine.Volumes = append(machine.Volumes, newVolume) m.vid2machineId[vid] = machineId - log.Println("Adding volume", vid, "from", machine.Server) + log.Println("Adding volume", vid, "from", machine.Server.Url) *vids = append(*vids, vid) m.GlobalVolumeSequence = vid + 1 } @@ -119,7 +110,7 @@ func (m *Mapper) Add(machine Machine) []uint64 { //add to vid2machineId map, and Writers array for _, v := range machine.Volumes { - log.Println("Setting volume", v.Id, "to", machine.Server) + log.Println("Setting volume", v.Id, "to", machine.Server.Url) m.vid2machineId[v.Id] = machineId if v.Size < ChunkSizeLimit { m.Writers = append(m.Writers, machineId) @@ -127,14 +118,14 @@ func (m *Mapper) Add(machine Machine) []uint64 { } //setting Writers, copy-on-write because of possible updating var Writers []int - for machine_index, machine_entry := range m.Machines { - for _, v := range machine_entry.Volumes { - if v.Size < ChunkSizeLimit { - Writers = append(Writers, machine_index) - } - } - } - m.Writers = Writers + for machine_index, machine_entry := range m.Machines { + for _, v := range machine_entry.Volumes { + if v.Size < ChunkSizeLimit { + Writers = append(Writers, machine_index) + } + } + } + m.Writers = Writers log.Println("Machines:", len(m.Machines), "Volumes:", len(m.vid2machineId), "Writable:", len(m.Writers)) return *vids diff --git a/weed-fs/src/pkg/storage/needle.go b/weed-fs/src/pkg/storage/needle.go index 7fde63953..346d476e5 100644 --- a/weed-fs/src/pkg/storage/needle.go +++ b/weed-fs/src/pkg/storage/needle.go @@ -1,7 +1,7 @@ package storage import ( - "io" + "io" "io/ioutil" "http" "log" @@ -9,59 +9,59 @@ import ( "strings" ) -type Needle struct{ - Cookie uint8 "random number to mitigate brute force lookups" - Key uint64 "file id" - AlternateKey uint32 "supplemental id" - Size uint32 "Data size" - Data []byte "The actual file data" - Checksum int32 "CRC32 to check integrity" - Padding []byte "Aligned to 8 bytes" +type Needle struct { + Cookie uint8 "random number to mitigate brute force lookups" + Key uint64 "file id" + AlternateKey uint32 "supplemental id" + Size uint32 "Data size" + Data []byte "The actual file data" + Checksum int32 "CRC32 to check integrity" + Padding []byte "Aligned to 8 bytes" } -func NewNeedle(r *http.Request)(n *Needle){ - n = new(Needle) - form,fe:=r.MultipartReader() - if fe!=nil { - log.Fatalf("MultipartReader [ERROR] %s\n", fe) - } - part,_:=form.NextPart() - data,_:=ioutil.ReadAll(part) - n.Data = data - n.ParsePath(r.URL.Path[1:strings.LastIndex(r.URL.Path,".")]) +func NewNeedle(r *http.Request) (n *Needle) { + n = new(Needle) + form, fe := r.MultipartReader() + if fe != nil { + log.Fatalf("MultipartReader [ERROR] %s\n", fe) + } + part, _ := form.NextPart() + data, _ := ioutil.ReadAll(part) + n.Data = data - return + n.ParsePath(r.URL.Path[1:strings.LastIndex(r.URL.Path, ".")]) + + return } -func (n *Needle) ParsePath(path string){ - a := strings.Split(path,"_") - log.Println("cookie",a[0],"key",a[1],"altKey",a[2]) - cookie,_ := strconv.Atoi(a[0]) - n.Cookie = uint8(cookie) - n.Key,_ = strconv.Atoui64(a[1]) - altKey,_ := strconv.Atoui64(a[2]) - n.AlternateKey = uint32(altKey) +func (n *Needle) ParsePath(path string) { + a := strings.Split(path, "_") + log.Println("cookie", a[0], "key", a[1], "altKey", a[2]) + cookie, _ := strconv.Atoi(a[0]) + n.Cookie = uint8(cookie) + n.Key, _ = strconv.Atoui64(a[1]) + altKey, _ := strconv.Atoui64(a[2]) + n.AlternateKey = uint32(altKey) } -func (n *Needle) Append(w io.Writer){ - header := make([]byte,17) - header[0] = n.Cookie - uint64toBytes(header[1:9],n.Key) - uint32toBytes(header[9:13],n.AlternateKey) - n.Size = uint32(len(n.Data)) - uint32toBytes(header[13:17],n.Size) - w.Write(header) - w.Write(n.Data) - rest := 8-((n.Size+17+4)%8) - uint32toBytes(header[0:4],uint32(n.Checksum)) - w.Write(header[0:rest+4]) +func (n *Needle) Append(w io.Writer) { + header := make([]byte, 17) + header[0] = n.Cookie + uint64toBytes(header[1:9], n.Key) + uint32toBytes(header[9:13], n.AlternateKey) + n.Size = uint32(len(n.Data)) + uint32toBytes(header[13:17], n.Size) + w.Write(header) + w.Write(n.Data) + rest := 8 - ((n.Size + 17 + 4) % 8) + uint32toBytes(header[0:4], uint32(n.Checksum)) + w.Write(header[0 : rest+4]) } -func (n *Needle) Read(r io.Reader, size uint32){ - bytes := make([]byte,size+17+4) - r.Read(bytes) - n.Cookie = bytes[0] - n.Key = bytesToUint64(bytes[1:9]) - n.AlternateKey = bytesToUint32(bytes[9:13]) - n.Size = bytesToUint32(bytes[13:17]) - n.Data = bytes[17:17+size] - n.Checksum = int32(bytesToUint32(bytes[17+size:17+size+4])) +func (n *Needle) Read(r io.Reader, size uint32) { + bytes := make([]byte, size+17+4) + r.Read(bytes) + n.Cookie = bytes[0] + n.Key = bytesToUint64(bytes[1:9]) + n.AlternateKey = bytesToUint32(bytes[9:13]) + n.Size = bytesToUint32(bytes[13:17]) + n.Data = bytes[17 : 17+size] + n.Checksum = int32(bytesToUint32(bytes[17+size : 17+size+4])) } - diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go index 9669536ac..6d0eecb74 100644 --- a/weed-fs/src/pkg/storage/needle_map.go +++ b/weed-fs/src/pkg/storage/needle_map.go @@ -1,53 +1,53 @@ package storage import ( - "os" + "os" ) -type NeedleKey struct{ - Key uint64 "file id" - AlternateKey uint32 "supplemental id" +type NeedleKey struct { + Key uint64 "file id" + AlternateKey uint32 "supplemental id" } + func (k *NeedleKey) String() string { - var tmp [12]byte - for i :=uint(0);i<8;i++{ - tmp[i] = byte(k.Key >> (8*i)); - } - for i :=uint(0);i<4;i++{ - tmp[i+8] = byte(k.AlternateKey >> (8*i)); - } - return string(tmp[:]) + var tmp [12]byte + for i := uint(0); i < 8; i++ { + tmp[i] = byte(k.Key >> (8 * i)) + } + for i := uint(0); i < 4; i++ { + tmp[i+8] = byte(k.AlternateKey >> (8 * i)) + } + return string(tmp[:]) } -type NeedleValue struct{ - Offset uint32 "Volume offset" //since aligned to 8 bytes, range is 4G*8=32G - Size uint32 "Size of the data portion" +type NeedleValue struct { + Offset uint32 "Volume offset" //since aligned to 8 bytes, range is 4G*8=32G + Size uint32 "Size of the data portion" } -type NeedleMap struct{ - m map[string]*NeedleValue //mapping NeedleKey(Key,AlternateKey) to NeedleValue +type NeedleMap struct { + m map[string]*NeedleValue //mapping NeedleKey(Key,AlternateKey) to NeedleValue } -func NewNeedleMap() (nm *NeedleMap){ - nm = new(NeedleMap) - nm.m = make(map[string]*NeedleValue) - return + +func NewNeedleMap() *NeedleMap { + return &NeedleMap{m: make(map[string]*NeedleValue)} } -func (nm *NeedleMap) load(file *os.File){ +func (nm *NeedleMap) load(file *os.File) { } func makeKey(key uint64, altKey uint32) string { - var tmp [12]byte - for i :=uint(0);i<8;i++{ - tmp[i] = byte(key >> (8*i)); - } - for i :=uint(0);i<4;i++{ - tmp[i+8] = byte(altKey >> (8*i)); - } - return string(tmp[:]) -} -func (nm *NeedleMap) put(key uint64, altKey uint32, offset uint32, size uint32){ - nm.m[makeKey(key,altKey)] = &NeedleValue{Offset:offset, Size:size} -} -func (nm *NeedleMap) get(key uint64, altKey uint32) (element *NeedleValue, ok bool){ - element, ok = nm.m[makeKey(key,altKey)] - return + var tmp [12]byte + for i := uint(0); i < 8; i++ { + tmp[i] = byte(key >> (8 * i)) + } + for i := uint(0); i < 4; i++ { + tmp[i+8] = byte(altKey >> (8 * i)) + } + return string(tmp[:]) +} +func (nm *NeedleMap) put(key uint64, altKey uint32, offset uint32, size uint32) { + nm.m[makeKey(key, altKey)] = &NeedleValue{Offset: offset, Size: size} +} +func (nm *NeedleMap) get(key uint64, altKey uint32) (element *NeedleValue, ok bool) { + element, ok = nm.m[makeKey(key, altKey)] + return } diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index ff6944853..f3fc2862c 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -17,13 +17,12 @@ type Store struct { PublicServer string } type VolumeInfo struct { - Id uint64 + Id uint64 Size int64 } func NewStore(port int, publicServer, dirname string, chunkSize, capacity int) (s *Store) { - s = new(Store) - s.Port, s.PublicServer, s.dir, s.capacity = port, publicServer, dirname, capacity + s = &Store{Port: port, PublicServer: publicServer, dir: dirname, capacity: capacity} s.volumes = make(map[uint64]*Volume) files, _ := ioutil.ReadDir(dirname) @@ -60,7 +59,7 @@ func (s *Store) Join(mserver string) { retString := post("http://"+mserver+"/dir/join", values) if retString != nil { newVids := new([]int) - log.Println("Instructed to create volume",string(retString)) + log.Println("Instructed to create volume", string(retString)) e := json.Unmarshal(retString, newVids) if e == nil { for _, vid := range *newVids { diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index 4b7f6ab3a..f065233fe 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -18,9 +18,7 @@ type Volume struct { func NewVolume(dirname string, id uint64) (v *Volume) { var e os.Error - v = new(Volume) - v.dir = dirname - v.Id = id + v = &Volume{dir:dirname,Id:id, nm:NewNeedleMap()} fileName := strconv.Uitoa64(v.Id) v.dataFile, e = os.OpenFile(path.Join(v.dir,fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644) if e != nil { @@ -30,7 +28,6 @@ func NewVolume(dirname string, id uint64) (v *Volume) { if e != nil { log.Fatalf("New Volume [ERROR] %s\n", e) } - v.nm = NewNeedleMap() v.nm.load(v.indexFile) v.accessChannel = make(chan int, 1)