diff --git a/weed-fs/src/cmd/weedmaster/weedmaster.go b/weed-fs/src/cmd/weedmaster/weedmaster.go index 15f46197f..3f6b4daa2 100644 --- a/weed-fs/src/cmd/weedmaster/weedmaster.go +++ b/weed-fs/src/cmd/weedmaster/weedmaster.go @@ -37,9 +37,10 @@ func dirLookupHandler(w http.ResponseWriter, r *http.Request) { } } func dirAssignHandler(w http.ResponseWriter, r *http.Request) { - fid, machine, err := mapper.PickForWrite() + c:=r.FormValue("count") + fid, count, machine, err := mapper.PickForWrite(c) if err == nil { - writeJson(w, r, map[string]string{"fid": fid, "url": machine.Url, "publicUrl":machine.PublicUrl}) + writeJson(w, r, map[string]string{"fid": fid, "url": machine.Url, "publicUrl":machine.PublicUrl, "count":strconv.Itoa(count)}) } else { log.Println(err) writeJson(w, r, map[string]string{"error": err.Error()}) diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go index 267a38a86..947eb65e9 100644 --- a/weed-fs/src/pkg/directory/volume_mapping.go +++ b/weed-fs/src/pkg/directory/volume_mapping.go @@ -29,7 +29,8 @@ type Mapper struct { dir string fileName string - lock sync.Mutex + volumeLock sync.Mutex + sequenceLock sync.Mutex Machines []*Machine vid2machineId map[uint32]int //machineId is +1 of the index of []*Machine, to detect not found entries Writers []uint32 // transient array of Writers volume id @@ -65,28 +66,41 @@ func NewMapper(dirname string, filename string, volumeSizeLimit uint64) (m *Mapp } return } -func (m *Mapper) PickForWrite() (string, MachineInfo, error) { +func (m *Mapper) PickForWrite(c string) (string, int, MachineInfo, error) { len_writers := len(m.Writers) if len_writers <= 0 { log.Println("No more writable volumes!") - return "", m.Machines[rand.Intn(len(m.Machines))].Server, errors.New("No more writable volumes!") + return "", 0, m.Machines[rand.Intn(len(m.Machines))].Server, errors.New("No more writable volumes!") } vid := m.Writers[rand.Intn(len_writers)] machine_id := m.vid2machineId[vid] if machine_id > 0 { machine := m.Machines[machine_id-1] - return NewFileId(vid, m.NextFileId(), rand.Uint32()).String(), machine.Server, nil + fileId, count := m.NextFileId(c) + if count==0 { + return "", 0, m.Machines[rand.Intn(len(m.Machines))].Server, errors.New("Strange count:" + c) + } + return NewFileId(vid, fileId, rand.Uint32()).String(), count, machine.Server, nil } - return "", m.Machines[rand.Intn(len(m.Machines))].Server, errors.New("Strangely vid " + strconv.FormatUint(uint64(vid), 10) + " is on no machine!") + return "", 0, m.Machines[rand.Intn(len(m.Machines))].Server, errors.New("Strangely vid " + strconv.FormatUint(uint64(vid), 10) + " is on no machine!") } -func (m *Mapper) NextFileId() uint64 { - if m.fileIdCounter <= 0 { +func (m *Mapper) NextFileId(c string) (uint64,int) { + count, parseError := strconv.ParseUint(c,10,64) + if parseError!=nil { + if len(c)>0{ + return 0,0 + } + count = 1 + } + if m.fileIdCounter < count { + m.sequenceLock.Lock(); m.fileIdCounter = FileIdSaveInterval m.FileIdSequence += FileIdSaveInterval m.saveSequence() + m.sequenceLock.Unlock(); } - m.fileIdCounter-- - return m.FileIdSequence - m.fileIdCounter + m.fileIdCounter = m.fileIdCounter - count + return m.FileIdSequence - m.fileIdCounter, int(count) } func (m *Mapper) Get(vid uint32) (*Machine, error) { machineId := m.vid2machineId[vid] @@ -98,7 +112,7 @@ func (m *Mapper) Get(vid uint32) (*Machine, error) { func (m *Mapper) Add(machine Machine) { //check existing machine, linearly //log.Println("Adding machine", machine.Server.Url) - m.lock.Lock() + m.volumeLock.Lock() foundExistingMachineId := -1 for index, entry := range m.Machines { if machine.Server.Url == entry.Server.Url { @@ -113,7 +127,7 @@ func (m *Mapper) Add(machine Machine) { } else { m.Machines[machineId] = &machine } - m.lock.Unlock() + m.volumeLock.Unlock() //add to vid2machineId map, and Writers array for _, v := range machine.Volumes {