Browse Source

change to vid~machines mapping

pull/2/head
Chris Lu 12 years ago
parent
commit
a74f6cf593
  1. 10
      weed-fs/src/cmd/weed/master.go
  2. 62
      weed-fs/src/pkg/directory/volume_mapping.go

10
weed-fs/src/cmd/weed/master.go

@ -41,12 +41,16 @@ func dirLookupHandler(w http.ResponseWriter, r *http.Request) {
vid = vid[0:commaSep]
}
volumeId, _ := storage.NewVolumeId(vid)
machine, e := mapper.Get(volumeId)
machines, e := mapper.Get(volumeId)
if e == nil {
writeJson(w, r, map[string]string{"url": machine.Url, "publicUrl": machine.PublicUrl})
var ret []map[string]string
for _, machine := range machines {
ret = append(ret,map[string]string{"url": machine.Url, "publicUrl": machine.PublicUrl})
}
writeJson(w, r, ret)
} else {
log.Println("Invalid volume id", volumeId)
writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found"})
writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. " + e.Error()})
}
}
func dirAssignHandler(w http.ResponseWriter, r *http.Request) {

62
weed-fs/src/pkg/directory/volume_mapping.go

@ -18,10 +18,10 @@ type Machine struct {
}
type Mapper struct {
Machines map[string]*Machine
vid2machine map[storage.VolumeId]*Machine
Writers []storage.VolumeId // transient array of Writers volume id
pulse int64
Machines map[string]*Machine
vid2machines map[storage.VolumeId][]*Machine
Writers []storage.VolumeId // transient array of Writers volume id
pulse int64
volumeSizeLimit uint64
@ -34,7 +34,7 @@ func NewMachine(server, publicUrl string, volumes []storage.VolumeInfo, lastSeen
func NewMapper(dirname string, filename string, volumeSizeLimit uint64, pulse int) (m *Mapper) {
m = &Mapper{}
m.vid2machine = make(map[storage.VolumeId]*Machine)
m.vid2machines = make(map[storage.VolumeId][]*Machine)
m.volumeSizeLimit = volumeSizeLimit
m.Writers = *new([]storage.VolumeId)
m.Machines = make(map[string]*Machine)
@ -51,37 +51,65 @@ func (m *Mapper) PickForWrite(c string) (string, int, *Machine, error) {
return "", 0, nil, errors.New("No more writable volumes!")
}
vid := m.Writers[rand.Intn(len_writers)]
machine := m.vid2machine[vid]
if machine != nil {
machines := m.vid2machines[vid]
if machines != nil && len(machines)>0 {
fileId, count := m.sequence.NextFileId(util.ParseInt(c, 1))
if count == 0 {
return "", 0, nil, errors.New("Strange count:" + c)
}
return NewFileId(vid, fileId, rand.Uint32()).String(), count, machine, nil
//always use the first server to write
return NewFileId(vid, fileId, rand.Uint32()).String(), count, machines[0], nil
}
return "", 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!")
}
func (m *Mapper) Get(vid storage.VolumeId) (*Machine, error) {
machine := m.vid2machine[vid]
if machine == nil {
func (m *Mapper) Get(vid storage.VolumeId) ([]*Machine, error) {
machines := m.vid2machines[vid]
if machines == nil {
return nil, errors.New("invalid volume id " + vid.String())
}
return machine, nil
return machines, nil
}
func (m *Mapper) Add(machine *Machine) {
m.Machines[machine.Url] = machine
//add to vid2machine map, and Writers array
for _, v := range machine.Volumes {
m.vid2machine[v.Id] = machine
list := m.vid2machines[v.Id]
found := false
for index, entry := range list {
if machine.Url == entry.Url {
list[index] = machine
found = true
}
}
if !found {
m.vid2machines[v.Id] = append(m.vid2machines[v.Id], machine)
}
}
m.refreshWritableVolumes()
}
func (m *Mapper) remove(machine *Machine) {
delete(m.Machines,machine.Url)
for _, v := range machine.Volumes {
delete(m.vid2machine,v.Id)
}
delete(m.Machines, machine.Url)
for _, v := range machine.Volumes {
list := m.vid2machines[v.Id]
foundIndex := -1
for index, entry := range list {
if machine.Url == entry.Url {
foundIndex = index
}
}
m.vid2machines[v.Id] = deleteFromSlice(foundIndex,m.vid2machines[v.Id])
}
}
func deleteFromSlice(i int, slice []*Machine) []*Machine{
switch i {
case -1://do nothing
case 0: slice = slice[1:]
case len(slice)-1: slice = slice[:len(slice)-1]
default: slice = append(slice[:i], slice[i+1:]...)
}
return slice
}
func (m *Mapper) StartRefreshWritableVolumes() {
go func() {
for {

Loading…
Cancel
Save