diff --git a/weed-fs/src/cmd/weed/master.go b/weed-fs/src/cmd/weed/master.go index 13a701e20..a92b7c0f1 100644 --- a/weed-fs/src/cmd/weed/master.go +++ b/weed-fs/src/cmd/weed/master.go @@ -41,12 +41,16 @@ func dirLookupHandler(w http.ResponseWriter, r *http.Request) { vid = vid[0:commaSep] } volumeId, _ := storage.NewVolumeId(vid) - machine, e := mapper.Get(volumeId) + machines, e := mapper.Get(volumeId) if e == nil { - writeJson(w, r, map[string]string{"url": machine.Url, "publicUrl": machine.PublicUrl}) + var ret []map[string]string + for _, machine := range machines { + ret = append(ret,map[string]string{"url": machine.Url, "publicUrl": machine.PublicUrl}) + } + writeJson(w, r, ret) } else { log.Println("Invalid volume id", volumeId) - writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found"}) + writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. " + e.Error()}) } } func dirAssignHandler(w http.ResponseWriter, r *http.Request) { diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go index 6b52268f7..fd1df6e39 100644 --- a/weed-fs/src/pkg/directory/volume_mapping.go +++ b/weed-fs/src/pkg/directory/volume_mapping.go @@ -18,10 +18,10 @@ type Machine struct { } type Mapper struct { - Machines map[string]*Machine - vid2machine map[storage.VolumeId]*Machine - Writers []storage.VolumeId // transient array of Writers volume id - pulse int64 + Machines map[string]*Machine + vid2machines map[storage.VolumeId][]*Machine + Writers []storage.VolumeId // transient array of Writers volume id + pulse int64 volumeSizeLimit uint64 @@ -34,7 +34,7 @@ func NewMachine(server, publicUrl string, volumes []storage.VolumeInfo, lastSeen func NewMapper(dirname string, filename string, volumeSizeLimit uint64, pulse int) (m *Mapper) { m = &Mapper{} - m.vid2machine = make(map[storage.VolumeId]*Machine) + m.vid2machines = make(map[storage.VolumeId][]*Machine) m.volumeSizeLimit = volumeSizeLimit m.Writers = *new([]storage.VolumeId) m.Machines = make(map[string]*Machine) @@ -51,37 +51,65 @@ func (m *Mapper) PickForWrite(c string) (string, int, *Machine, error) { return "", 0, nil, errors.New("No more writable volumes!") } vid := m.Writers[rand.Intn(len_writers)] - machine := m.vid2machine[vid] - if machine != nil { + machines := m.vid2machines[vid] + if machines != nil && len(machines)>0 { fileId, count := m.sequence.NextFileId(util.ParseInt(c, 1)) if count == 0 { return "", 0, nil, errors.New("Strange count:" + c) } - return NewFileId(vid, fileId, rand.Uint32()).String(), count, machine, nil + //always use the first server to write + return NewFileId(vid, fileId, rand.Uint32()).String(), count, machines[0], nil } return "", 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") } -func (m *Mapper) Get(vid storage.VolumeId) (*Machine, error) { - machine := m.vid2machine[vid] - if machine == nil { +func (m *Mapper) Get(vid storage.VolumeId) ([]*Machine, error) { + machines := m.vid2machines[vid] + if machines == nil { return nil, errors.New("invalid volume id " + vid.String()) } - return machine, nil + return machines, nil } func (m *Mapper) Add(machine *Machine) { m.Machines[machine.Url] = machine //add to vid2machine map, and Writers array for _, v := range machine.Volumes { - m.vid2machine[v.Id] = machine + list := m.vid2machines[v.Id] + found := false + for index, entry := range list { + if machine.Url == entry.Url { + list[index] = machine + found = true + } + } + if !found { + m.vid2machines[v.Id] = append(m.vid2machines[v.Id], machine) + } } m.refreshWritableVolumes() } func (m *Mapper) remove(machine *Machine) { - delete(m.Machines,machine.Url) - for _, v := range machine.Volumes { - delete(m.vid2machine,v.Id) - } + delete(m.Machines, machine.Url) + for _, v := range machine.Volumes { + list := m.vid2machines[v.Id] + foundIndex := -1 + for index, entry := range list { + if machine.Url == entry.Url { + foundIndex = index + } + } + m.vid2machines[v.Id] = deleteFromSlice(foundIndex,m.vid2machines[v.Id]) + } +} +func deleteFromSlice(i int, slice []*Machine) []*Machine{ + switch i { + case -1://do nothing + case 0: slice = slice[1:] + case len(slice)-1: slice = slice[:len(slice)-1] + default: slice = append(slice[:i], slice[i+1:]...) + } + return slice } + func (m *Mapper) StartRefreshWritableVolumes() { go func() { for {