diff --git a/weed-fs/src/cmd/weedc.go b/weed-fs/src/cmd/weedc.go index 56cbf9e86..0aa03f85a 100644 --- a/weed-fs/src/cmd/weedc.go +++ b/weed-fs/src/cmd/weedc.go @@ -29,6 +29,10 @@ var ( func statusHandler(w http.ResponseWriter, r *http.Request) { writeJson(w, r, store.Status()) } +func addVolumeHandler(w http.ResponseWriter, r *http.Request) { + store.AddVolume(r.FormValue("volume")) + writeJson(w, r, store.Status()) +} func storeHandler(w http.ResponseWriter, r *http.Request) { switch r.Method { case "GET": @@ -133,6 +137,7 @@ func main() { defer store.Close() http.HandleFunc("/", storeHandler) http.HandleFunc("/status", statusHandler) + http.HandleFunc("/add_volume", addVolumeHandler) go func() { for { diff --git a/weed-fs/src/cmd/weeds.go b/weed-fs/src/cmd/weeds.go index 1965daf85..0a1b78f0e 100644 --- a/weed-fs/src/cmd/weeds.go +++ b/weed-fs/src/cmd/weeds.go @@ -27,16 +27,22 @@ func dirLookupHandler(w http.ResponseWriter, r *http.Request) { vid = vid[0:commaSep] } volumeId, _ := strconv.Atoui64(vid) - machine := mapper.Get(uint32(volumeId)) - writeJson(w, r, machine.Server) -} -func dirWriteHandler(w http.ResponseWriter, r *http.Request) { - _, machine := mapper.PickForWrite() - writeJson(w, r, machine) + machine, e := mapper.Get(uint32(volumeId)) + if e == nil { + writeJson(w, r, machine.Server) + } else { + log.Println("Invalid volume id", volumeId) + writeJson(w, r, map[string]string{"error": "volume id " + strconv.Uitoa64(volumeId) + " not found"}) + } } func dirAssignHandler(w http.ResponseWriter, r *http.Request) { - fid, machine := mapper.PickForWrite() - writeJson(w, r, map[string]string{"fid": fid, "url": machine.Url}) + fid, machine, err := mapper.PickForWrite() + if err == nil { + writeJson(w, r, map[string]string{"fid": fid, "url": machine.Url}) + } else { + log.Println(err) + writeJson(w, r, map[string]string{"error": err.String()}) + } } func dirJoinHandler(w http.ResponseWriter, r *http.Request) { s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") @@ -71,7 +77,6 @@ func main() { mapper = directory.NewMapper(*metaFolder, "directory") http.HandleFunc("/dir/assign", dirAssignHandler) http.HandleFunc("/dir/lookup", dirLookupHandler) - http.HandleFunc("/dir/write", dirWriteHandler) http.HandleFunc("/dir/join", dirJoinHandler) http.HandleFunc("/dir/status", dirStatusHandler) diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go index bee1f78dd..a51294411 100644 --- a/weed-fs/src/pkg/directory/volume_mapping.go +++ b/weed-fs/src/pkg/directory/volume_mapping.go @@ -7,11 +7,12 @@ import ( "rand" "log" "storage" + "strconv" "sync" ) const ( - ChunkSizeLimit = 1 * 1000 * 1000 * 1000 //1G, can not be more than max(uint32)*8 + ChunkSizeLimit = 32 * 1024 * 1024 //32G, can not be more than max(uint32)*8 FileIdSaveInterval = 10000 ) @@ -61,10 +62,15 @@ func NewMapper(dirname string, filename string) (m *Mapper) { } return } -func (m *Mapper) PickForWrite() (string, MachineInfo) { - machine := m.Machines[m.Writers[rand.Intn(len(m.Writers))]] +func (m *Mapper) PickForWrite() (string, MachineInfo, os.Error) { + len_writers := len(m.Writers) + if len_writers<=0 { + log.Println("No more writable volumes!") + return "",m.Machines[rand.Intn(len(m.Machines))].Server, os.NewError("No more writable volumes!") + } + machine := m.Machines[m.Writers[rand.Intn(len_writers)]] vid := machine.Volumes[rand.Intn(len(machine.Volumes))].Id - return NewFileId(vid, m.NextFileId(), rand.Uint32()).String(), machine.Server + return NewFileId(vid, m.NextFileId(), rand.Uint32()).String(), machine.Server,nil } func (m *Mapper) NextFileId() uint64 { if m.fileIdCounter <= 0 { @@ -75,8 +81,12 @@ func (m *Mapper) NextFileId() uint64 { m.fileIdCounter-- return m.FileIdSequence - m.fileIdCounter } -func (m *Mapper) Get(vid uint32) *Machine { - return m.Machines[m.vid2machineId[vid]] +func (m *Mapper) Get(vid uint32) (*Machine, os.Error) { + machineId := m.vid2machineId[vid] + if machineId <=0{ + return nil, os.NewError("invalid volume id " + strconv.Uitob64(uint64(vid),10)) + } + return m.Machines[machineId-1],nil } func (m *Mapper) Add(machine Machine){ //check existing machine, linearly @@ -100,7 +110,7 @@ func (m *Mapper) Add(machine Machine){ //add to vid2machineId map, and Writers array for _, v := range machine.Volumes { //log.Println("Setting volume", v.Id, "to", machine.Server.Url) - m.vid2machineId[v.Id] = machineId + m.vid2machineId[v.Id] = machineId+1 //use base 1 indexed, to detect not found cases } //setting Writers, copy-on-write because of possible updating var writers []int diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index 900c74bf1..221e9f565 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -24,37 +24,45 @@ type VolumeInfo struct { func NewStore(port int, publicUrl, dirname string, volumeListString string) (s *Store) { s = &Store{Port: port, PublicUrl: publicUrl, dir: dirname} s.volumes = make(map[uint64]*Volume) + + s.AddVolume(volumeListString) - for _, range_string := range strings.Split(volumeListString, ",") { - if strings.Index(range_string, "-") < 0 { - id_string := range_string + log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes") + return +} +func (s *Store) AddVolume(volumeListString string) os.Error{ + for _, range_string := range strings.Split(volumeListString, ",") { + if strings.Index(range_string, "-") < 0 { + id_string := range_string id, err := strconv.Atoui64(id_string) if err != nil { - log.Println("Volume Id", id_string, "is not a valid unsigned integer! Skipping it...") - continue + return os.NewError("Volume Id " + id_string+ " is not a valid unsigned integer!") + } + s.addVolume(id) + } else { + pair := strings.Split(range_string, "-") + start, start_err := strconv.Atoui64(pair[0]) + if start_err != nil { + return os.NewError("Volume Start Id" + pair[0] + " is not a valid unsigned integer!") } - s.volumes[id] = NewVolume(s.dir, uint32(id)) - } else { - pair := strings.Split(range_string, "-") - start, start_err := strconv.Atoui64(pair[0]) - if start_err != nil { - log.Println("Volume Id", pair[0], "is not a valid unsigned integer! Skipping it...") - continue - } end, end_err := strconv.Atoui64(pair[1]) if end_err != nil { - log.Println("Volume Id", pair[1], "is not a valid unsigned integer! Skipping it...") - continue + return os.NewError("Volume End Id" + pair[1] + " is not a valid unsigned integer!") } for id := start; id<=end; id++ { - s.volumes[id] = NewVolume(s.dir, uint32(id)) - } - } - } - log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes") - return + s.addVolume(id) + } + } + } + return nil +} +func (s *Store) addVolume(vid uint64) os.Error{ + if s.volumes[vid]!=nil { + return os.NewError("Volume Id "+strconv.Uitoa64(vid)+" already exists!") + } + s.volumes[vid] = NewVolume(s.dir, uint32(vid)) + return nil } - func (s *Store) Status() *[]*VolumeInfo { stats := new([]*VolumeInfo) for k, v := range s.volumes {