Browse Source

simplified to one machine per volume

git-svn-id: https://weed-fs.googlecode.com/svn/trunk@14 282b0af5-e82d-9cf1-ede4-77906d7719d0
pull/2/head
chris.lu@gmail.com 13 years ago
parent
commit
2c7a4eea1f
  1. 7
      weed-fs/note/startup_process.txt
  2. 25
      weed-fs/src/cmd/weeds.go
  3. 131
      weed-fs/src/pkg/directory/volume_mapping.go
  4. 14
      weed-fs/src/pkg/storage/store.go
  5. 6
      weed-fs/src/pkg/storage/volume.go

7
weed-fs/note/startup_process.txt

@ -0,0 +1,7 @@
1. clients report its own server info, volumes info,
2. master collect all volumes, separated into readable volumes, writable volumes, volume2machine mapping
machines is an array of machine info
writable volumes is an array of vids
vid2machineId maps volume id to machineId, which is the index of machines array

25
weed-fs/src/cmd/weeds.go

@ -8,7 +8,6 @@ import (
"http" "http"
"json" "json"
"log" "log"
"rand"
"strconv" "strconv"
"strings" "strings"
) )
@ -22,20 +21,8 @@ var (
func dirReadHandler(w http.ResponseWriter, r *http.Request) { func dirReadHandler(w http.ResponseWriter, r *http.Request) {
volumeId, _ := strconv.Atoi(r.FormValue("volumeId")) volumeId, _ := strconv.Atoi(r.FormValue("volumeId"))
machineList := mapper.Get(volumeId)
x := rand.Intn(len(machineList))
machine := machineList[x]
bytes, _ := json.Marshal(machine)
callback := r.FormValue("callback")
w.Header().Set("Content-Type", "application/javascript")
if callback == "" {
w.Write(bytes)
} else {
w.Write([]uint8(callback))
w.Write([]uint8("("))
w.Write(bytes)
w.Write([]uint8(")"))
}
machine := mapper.Get(volumeId)
writeJson(w, r, machine)
} }
func dirWriteHandler(w http.ResponseWriter, r *http.Request) { func dirWriteHandler(w http.ResponseWriter, r *http.Request) {
machineList := mapper.PickForWrite() machineList := mapper.PickForWrite()
@ -44,17 +31,17 @@ func dirWriteHandler(w http.ResponseWriter, r *http.Request) {
func dirJoinHandler(w http.ResponseWriter, r *http.Request) { func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
publicServer := r.FormValue("publicServer") publicServer := r.FormValue("publicServer")
volumes := new([]storage.VolumeStat)
volumes := new([]storage.VolumeInfo)
json.Unmarshal([]byte(r.FormValue("volumes")), volumes) json.Unmarshal([]byte(r.FormValue("volumes")), volumes)
capacity, _ := strconv.Atoi(r.FormValue("capacity")) capacity, _ := strconv.Atoi(r.FormValue("capacity"))
log.Println("Recieved joining request from remote address", s, "capacity=", capacity, "volumes", r.FormValue("volumes")) log.Println("Recieved joining request from remote address", s, "capacity=", capacity, "volumes", r.FormValue("volumes"))
vids := mapper.Add(*directory.NewMachine(s, publicServer), *volumes, capacity)
vids := mapper.Add(*directory.NewMachine(s, publicServer, *volumes, capacity))
writeJson(w, r, vids) writeJson(w, r, vids)
} }
func dirStatusHandler(w http.ResponseWriter, r *http.Request) { func dirStatusHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain") w.Header().Set("Content-Type", "text/plain")
bytes, _ := json.Marshal(mapper) bytes, _ := json.Marshal(mapper)
fmt.Fprint(w, bytes)
fmt.Fprint(w, string(bytes))
} }
func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) { func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) {
w.Header().Set("Content-Type", "application/javascript") w.Header().Set("Content-Type", "application/javascript")
@ -65,7 +52,7 @@ func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) {
} else { } else {
w.Write([]uint8(callback)) w.Write([]uint8(callback))
w.Write([]uint8("(")) w.Write([]uint8("("))
w.Write(bytes)
fmt.Fprint(w, string(bytes))
w.Write([]uint8(")")) w.Write([]uint8(")"))
} }
log.Println("JSON Response", string(bytes)) log.Println("JSON Response", string(bytes))

131
weed-fs/src/pkg/directory/volume_mapping.go

@ -7,24 +7,36 @@ import (
"rand" "rand"
"log" "log"
"storage" "storage"
"sync"
)
const (
ChunkSizeLimit = 1 * 1024 * 1024 * 1024 //1G, can not be more than max(uint32)*8
) )
type Machine struct { type Machine struct {
Server string //<server name/ip>[:port] Server string //<server name/ip>[:port]
PublicServer string PublicServer string
CanWrite bool
Volumes []storage.VolumeInfo
Capacity int
} }
type Mapper struct { type Mapper struct {
dir string dir string
fileName string fileName string
capacity int capacity int
Machines [][]Machine //initial version only support one copy per machine
writers [][]Machine // transient value to lookup writers fast
lock sync.Mutex
Machines []*Machine
vid2machineId map[uint64]int
writers []int // transient array of writers volume id
GlobalVolumeSequence uint64
} }
func NewMachine(server, publicServer string) (m *Machine) {
func NewMachine(server, publicServer string, volumes []storage.VolumeInfo, capacity int) (m *Machine) {
m = new(Machine) m = new(Machine)
m.Server, m.PublicServer = server, publicServer
m.Server, m.PublicServer, m.Volumes, m.Capacity = server, publicServer, volumes, capacity
return return
} }
@ -33,74 +45,90 @@ func NewMapper(dirname string, filename string, capacity int) (m *Mapper) {
m.dir, m.fileName, m.capacity = dirname, filename, capacity m.dir, m.fileName, m.capacity = dirname, filename, capacity
log.Println("Loading volume id to maching mapping:", path.Join(m.dir, m.fileName+".map")) log.Println("Loading volume id to maching mapping:", path.Join(m.dir, m.fileName+".map"))
dataFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".map"), os.O_RDONLY, 0644) dataFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".map"), os.O_RDONLY, 0644)
m.Machines = *new([][]Machine)
m.writers = *new([][]Machine)
m.vid2machineId = make(map[uint64]int)
m.writers = *new([]int)
if e != nil { if e != nil {
log.Println("Mapping File Read", e) log.Println("Mapping File Read", e)
m.Machines = *new([]*Machine)
} else { } else {
decoder := gob.NewDecoder(dataFile) decoder := gob.NewDecoder(dataFile)
defer dataFile.Close()
defer dataFile.Close()
decoder.Decode(&m.Machines) decoder.Decode(&m.Machines)
for _, list := range m.Machines {
//TODO: what if a list has mixed readers and writers? Now it's treated as readonly
allCanWrite := false
for _, entry := range list {
allCanWrite = allCanWrite && entry.CanWrite
}
if allCanWrite {
m.writers = append(m.writers, list)
decoder.Decode(&m.GlobalVolumeSequence)
//add to vid2machineId map, and writers array
for machine_index, machine := range m.Machines {
for _, v := range machine.Volumes {
m.vid2machineId[v.Id] = machine_index
if v.Size < ChunkSizeLimit {
m.writers = append(m.writers, machine_index)
}
} }
} }
log.Println("Loaded mapping size", len(m.Machines))
log.Println("Loaded mapping size", len(m.Machines))
} }
return return
} }
func (m *Mapper) PickForWrite() []Machine {
vid := rand.Intn(len(m.Machines))
return m.Machines[vid]
func (m *Mapper) PickForWrite() *Machine {
vid := rand.Intn(len(m.writers))
return m.Machines[m.writers[vid]]
} }
func (m *Mapper) Get(vid int) []Machine {
func (m *Mapper) Get(vid int) *Machine {
return m.Machines[vid] return m.Machines[vid]
} }
func (m *Mapper) Add(machine Machine, volumes []storage.VolumeStat, capacity int) []int {
log.Println("Adding existing", machine.Server, len(volumes), "volumes to dir", len(m.Machines))
log.Println("Adding new ", machine.Server, capacity - len(volumes), "volumes to dir", len(m.Machines))
maxId := len(m.Machines)-1
for _, v := range volumes {
if maxId < int(v.Id) {
maxId = int(v.Id)
func (m *Mapper) Add(machine Machine) []uint64 {
log.Println("Adding existing", machine.Server, len(machine.Volumes), "volumes to dir", len(m.Machines))
log.Println("Adding new ", machine.Server, machine.Capacity-len(machine.Volumes), "volumes to dir", len(m.Machines))
//check existing machine, linearly
m.lock.Lock()
foundExistingMachineId := -1
for index, entry := range m.Machines {
if machine.Server == entry.Server {
foundExistingMachineId = index
break
} }
} }
for i := len(m.Machines); i <= maxId; i++ {
m.Machines = append(m.Machines, nil)
}
log.Println("Machine list now is", len(m.Machines))
for _, v := range volumes {
found := false
existing := m.Machines[v.Id]
for _, entry := range existing {
if machine.Server == entry.Server {
found = true
break
}
}
if !found {
m.Machines[v.Id] = append(existing, machine)
log.Println("Setting volume", v.Id, "to", machine.Server)
}
machineId := foundExistingMachineId
if machineId < 0 {
machineId = len(m.Machines)
m.Machines = append(m.Machines, &machine)
} }
vids := new([]int)
for vid,i := len(m.Machines),len(volumes); i < capacity; i,vid=i+1,vid+1 {
list := new([]Machine)
*list = append(*list, machine)
m.Machines = append(m.Machines, *list)
//generate new volumes
vids := new([]uint64)
for vid, i := m.GlobalVolumeSequence, len(machine.Volumes); i < machine.Capacity; i, vid = i+1, vid+1 {
newVolume := *new(storage.VolumeInfo)
newVolume.Id, newVolume.Size = vid, 0
machine.Volumes = append(machine.Volumes, newVolume)
m.vid2machineId[vid] = machineId
log.Println("Adding volume", vid, "from", machine.Server) log.Println("Adding volume", vid, "from", machine.Server)
*vids = append(*vids, vid) *vids = append(*vids, vid)
m.GlobalVolumeSequence = vid + 1
} }
m.Save() m.Save()
log.Println("Dir size =>", len(m.Machines))
m.lock.Unlock()
//add to vid2machineId map, and writers array
for _, v := range machine.Volumes {
log.Println("Setting volume", v.Id, "to", machine.Server)
m.vid2machineId[v.Id] = machineId
if v.Size < ChunkSizeLimit {
m.writers = append(m.writers, machineId)
}
}
//setting writers, copy-on-write because of possible updating
var writers []int
for machine_index, machine_entry := range m.Machines {
for _, v := range machine_entry.Volumes {
if v.Size < ChunkSizeLimit {
writers = append(writers, machine_index)
}
}
}
m.writers = writers
log.Println("Machines:", len(m.Machines), "Volumes:", len(m.vid2machineId), "Writable:", len(m.writers))
return *vids return *vids
} }
func (m *Mapper) Save() { func (m *Mapper) Save() {
@ -112,4 +140,5 @@ func (m *Mapper) Save() {
defer dataFile.Close() defer dataFile.Close()
encoder := gob.NewEncoder(dataFile) encoder := gob.NewEncoder(dataFile)
encoder.Encode(m.Machines) encoder.Encode(m.Machines)
encoder.Encode(m.GlobalVolumeSequence)
} }

14
weed-fs/src/pkg/storage/store.go

@ -16,9 +16,9 @@ type Store struct {
Port int Port int
PublicServer string PublicServer string
} }
type VolumeStat struct {
Id uint64 "id"
CanWrite bool
type VolumeInfo struct {
Id uint64
Size int64
} }
func NewStore(port int, publicServer, dirname string, chunkSize, capacity int) (s *Store) { func NewStore(port int, publicServer, dirname string, chunkSize, capacity int) (s *Store) {
@ -44,10 +44,10 @@ func NewStore(port int, publicServer, dirname string, chunkSize, capacity int) (
} }
func (s *Store) Join(mserver string) { func (s *Store) Join(mserver string) {
stats := new([]*VolumeStat)
for k, _ := range s.volumes {
s := new(VolumeStat)
s.Id, s.CanWrite = k, true
stats := new([]*VolumeInfo)
for k, v := range s.volumes {
s := new(VolumeInfo)
s.Id, s.Size = k, v.Size()
*stats = append(*stats, s) *stats = append(*stats, s)
} }
bytes, _ := json.Marshal(stats) bytes, _ := json.Marshal(stats)

6
weed-fs/src/pkg/storage/volume.go

@ -38,12 +38,12 @@ func NewVolume(dirname string, id uint64) (v *Volume) {
return return
} }
func (v *Volume) CanWrite(limit int64) bool {
func (v *Volume) Size() int64 {
stat, e:=v.dataFile.Stat() stat, e:=v.dataFile.Stat()
if e!=nil{ if e!=nil{
return stat.Size < limit
return stat.Size
} }
return false
return -1
} }
func (v *Volume) Close() { func (v *Volume) Close() {
close(v.accessChannel) close(v.accessChannel)

Loading…
Cancel
Save