Browse Source

add capability to assign a batch of file ids

git-svn-id: https://weed-fs.googlecode.com/svn/trunk@50 282b0af5-e82d-9cf1-ede4-77906d7719d0
pull/2/head
chris.lu@gmail.com 13 years ago
parent
commit
68a216586f
  1. 5
      weed-fs/src/cmd/weedmaster/weedmaster.go
  2. 36
      weed-fs/src/pkg/directory/volume_mapping.go

5
weed-fs/src/cmd/weedmaster/weedmaster.go

@ -37,9 +37,10 @@ func dirLookupHandler(w http.ResponseWriter, r *http.Request) {
} }
} }
func dirAssignHandler(w http.ResponseWriter, r *http.Request) { func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
fid, machine, err := mapper.PickForWrite()
c:=r.FormValue("count")
fid, count, machine, err := mapper.PickForWrite(c)
if err == nil { if err == nil {
writeJson(w, r, map[string]string{"fid": fid, "url": machine.Url, "publicUrl":machine.PublicUrl})
writeJson(w, r, map[string]string{"fid": fid, "url": machine.Url, "publicUrl":machine.PublicUrl, "count":strconv.Itoa(count)})
} else { } else {
log.Println(err) log.Println(err)
writeJson(w, r, map[string]string{"error": err.Error()}) writeJson(w, r, map[string]string{"error": err.Error()})

36
weed-fs/src/pkg/directory/volume_mapping.go

@ -29,7 +29,8 @@ type Mapper struct {
dir string dir string
fileName string fileName string
lock sync.Mutex
volumeLock sync.Mutex
sequenceLock sync.Mutex
Machines []*Machine Machines []*Machine
vid2machineId map[uint32]int //machineId is +1 of the index of []*Machine, to detect not found entries vid2machineId map[uint32]int //machineId is +1 of the index of []*Machine, to detect not found entries
Writers []uint32 // transient array of Writers volume id Writers []uint32 // transient array of Writers volume id
@ -65,28 +66,41 @@ func NewMapper(dirname string, filename string, volumeSizeLimit uint64) (m *Mapp
} }
return return
} }
func (m *Mapper) PickForWrite() (string, MachineInfo, error) {
func (m *Mapper) PickForWrite(c string) (string, int, MachineInfo, error) {
len_writers := len(m.Writers) len_writers := len(m.Writers)
if len_writers <= 0 { if len_writers <= 0 {
log.Println("No more writable volumes!") log.Println("No more writable volumes!")
return "", m.Machines[rand.Intn(len(m.Machines))].Server, errors.New("No more writable volumes!")
return "", 0, m.Machines[rand.Intn(len(m.Machines))].Server, errors.New("No more writable volumes!")
} }
vid := m.Writers[rand.Intn(len_writers)] vid := m.Writers[rand.Intn(len_writers)]
machine_id := m.vid2machineId[vid] machine_id := m.vid2machineId[vid]
if machine_id > 0 { if machine_id > 0 {
machine := m.Machines[machine_id-1] machine := m.Machines[machine_id-1]
return NewFileId(vid, m.NextFileId(), rand.Uint32()).String(), machine.Server, nil
fileId, count := m.NextFileId(c)
if count==0 {
return "", 0, m.Machines[rand.Intn(len(m.Machines))].Server, errors.New("Strange count:" + c)
}
return NewFileId(vid, fileId, rand.Uint32()).String(), count, machine.Server, nil
} }
return "", m.Machines[rand.Intn(len(m.Machines))].Server, errors.New("Strangely vid " + strconv.FormatUint(uint64(vid), 10) + " is on no machine!")
return "", 0, m.Machines[rand.Intn(len(m.Machines))].Server, errors.New("Strangely vid " + strconv.FormatUint(uint64(vid), 10) + " is on no machine!")
} }
func (m *Mapper) NextFileId() uint64 {
if m.fileIdCounter <= 0 {
func (m *Mapper) NextFileId(c string) (uint64,int) {
count, parseError := strconv.ParseUint(c,10,64)
if parseError!=nil {
if len(c)>0{
return 0,0
}
count = 1
}
if m.fileIdCounter < count {
m.sequenceLock.Lock();
m.fileIdCounter = FileIdSaveInterval m.fileIdCounter = FileIdSaveInterval
m.FileIdSequence += FileIdSaveInterval m.FileIdSequence += FileIdSaveInterval
m.saveSequence() m.saveSequence()
m.sequenceLock.Unlock();
} }
m.fileIdCounter--
return m.FileIdSequence - m.fileIdCounter
m.fileIdCounter = m.fileIdCounter - count
return m.FileIdSequence - m.fileIdCounter, int(count)
} }
func (m *Mapper) Get(vid uint32) (*Machine, error) { func (m *Mapper) Get(vid uint32) (*Machine, error) {
machineId := m.vid2machineId[vid] machineId := m.vid2machineId[vid]
@ -98,7 +112,7 @@ func (m *Mapper) Get(vid uint32) (*Machine, error) {
func (m *Mapper) Add(machine Machine) { func (m *Mapper) Add(machine Machine) {
//check existing machine, linearly //check existing machine, linearly
//log.Println("Adding machine", machine.Server.Url) //log.Println("Adding machine", machine.Server.Url)
m.lock.Lock()
m.volumeLock.Lock()
foundExistingMachineId := -1 foundExistingMachineId := -1
for index, entry := range m.Machines { for index, entry := range m.Machines {
if machine.Server.Url == entry.Server.Url { if machine.Server.Url == entry.Server.Url {
@ -113,7 +127,7 @@ func (m *Mapper) Add(machine Machine) {
} else { } else {
m.Machines[machineId] = &machine m.Machines[machineId] = &machine
} }
m.lock.Unlock()
m.volumeLock.Unlock()
//add to vid2machineId map, and Writers array //add to vid2machineId map, and Writers array
for _, v := range machine.Volumes { for _, v := range machine.Volumes {

Loading…
Cancel
Save