|
@ -14,7 +14,12 @@ const ( |
|
|
ChunkSizeLimit = 1 * 1024 * 1024 * 1024 //1G, can not be more than max(uint32)*8
|
|
|
ChunkSizeLimit = 1 * 1024 * 1024 * 1024 //1G, can not be more than max(uint32)*8
|
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
type MachineInfo struct { |
|
|
|
|
|
Server string //<server name/ip>[:port]
|
|
|
|
|
|
PublicServer string |
|
|
|
|
|
} |
|
|
type Machine struct { |
|
|
type Machine struct { |
|
|
|
|
|
MachineInfo |
|
|
Server string //<server name/ip>[:port]
|
|
|
Server string //<server name/ip>[:port]
|
|
|
PublicServer string |
|
|
PublicServer string |
|
|
Volumes []storage.VolumeInfo |
|
|
Volumes []storage.VolumeInfo |
|
@ -29,7 +34,7 @@ type Mapper struct { |
|
|
lock sync.Mutex |
|
|
lock sync.Mutex |
|
|
Machines []*Machine |
|
|
Machines []*Machine |
|
|
vid2machineId map[uint64]int |
|
|
vid2machineId map[uint64]int |
|
|
writers []int // transient array of writers volume id
|
|
|
|
|
|
|
|
|
Writers []int // transient array of Writers volume id
|
|
|
|
|
|
|
|
|
GlobalVolumeSequence uint64 |
|
|
GlobalVolumeSequence uint64 |
|
|
} |
|
|
} |
|
@ -46,7 +51,7 @@ func NewMapper(dirname string, filename string, capacity int) (m *Mapper) { |
|
|
log.Println("Loading volume id to maching mapping:", path.Join(m.dir, m.fileName+".map")) |
|
|
log.Println("Loading volume id to maching mapping:", path.Join(m.dir, m.fileName+".map")) |
|
|
dataFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".map"), os.O_RDONLY, 0644) |
|
|
dataFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".map"), os.O_RDONLY, 0644) |
|
|
m.vid2machineId = make(map[uint64]int) |
|
|
m.vid2machineId = make(map[uint64]int) |
|
|
m.writers = *new([]int) |
|
|
|
|
|
|
|
|
m.Writers = *new([]int) |
|
|
if e != nil { |
|
|
if e != nil { |
|
|
log.Println("Mapping File Read", e) |
|
|
log.Println("Mapping File Read", e) |
|
|
m.Machines = *new([]*Machine) |
|
|
m.Machines = *new([]*Machine) |
|
@ -56,12 +61,12 @@ func NewMapper(dirname string, filename string, capacity int) (m *Mapper) { |
|
|
decoder.Decode(&m.Machines) |
|
|
decoder.Decode(&m.Machines) |
|
|
decoder.Decode(&m.GlobalVolumeSequence) |
|
|
decoder.Decode(&m.GlobalVolumeSequence) |
|
|
|
|
|
|
|
|
//add to vid2machineId map, and writers array
|
|
|
|
|
|
|
|
|
//add to vid2machineId map, and Writers array
|
|
|
for machine_index, machine := range m.Machines { |
|
|
for machine_index, machine := range m.Machines { |
|
|
for _, v := range machine.Volumes { |
|
|
for _, v := range machine.Volumes { |
|
|
m.vid2machineId[v.Id] = machine_index |
|
|
m.vid2machineId[v.Id] = machine_index |
|
|
if v.Size < ChunkSizeLimit { |
|
|
if v.Size < ChunkSizeLimit { |
|
|
m.writers = append(m.writers, machine_index) |
|
|
|
|
|
|
|
|
m.Writers = append(m.Writers, machine_index) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
@ -69,12 +74,15 @@ func NewMapper(dirname string, filename string, capacity int) (m *Mapper) { |
|
|
} |
|
|
} |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
func (m *Mapper) PickForWrite() *Machine { |
|
|
|
|
|
vid := rand.Intn(len(m.writers)) |
|
|
|
|
|
return m.Machines[m.writers[vid]] |
|
|
|
|
|
|
|
|
func (m *Mapper) PickForWrite() map[string]string { |
|
|
|
|
|
vid := rand.Intn(len(m.Writers)) |
|
|
|
|
|
return map[string]string{ |
|
|
|
|
|
"server":m.Machines[m.Writers[vid]].Server, |
|
|
|
|
|
"url":m.Machines[m.Writers[vid]].PublicServer, |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
func (m *Mapper) Get(vid int) *Machine { |
|
|
|
|
|
return m.Machines[vid] |
|
|
|
|
|
|
|
|
func (m *Mapper) Get(vid uint64) *Machine { |
|
|
|
|
|
return m.Machines[m.vid2machineId[vid]] |
|
|
} |
|
|
} |
|
|
func (m *Mapper) Add(machine Machine) []uint64 { |
|
|
func (m *Mapper) Add(machine Machine) []uint64 { |
|
|
log.Println("Adding existing", machine.Server, len(machine.Volumes), "volumes to dir", len(m.Machines)) |
|
|
log.Println("Adding existing", machine.Server, len(machine.Volumes), "volumes to dir", len(m.Machines)) |
|
@ -109,26 +117,26 @@ func (m *Mapper) Add(machine Machine) []uint64 { |
|
|
m.Save() |
|
|
m.Save() |
|
|
m.lock.Unlock() |
|
|
m.lock.Unlock() |
|
|
|
|
|
|
|
|
//add to vid2machineId map, and writers array
|
|
|
|
|
|
|
|
|
//add to vid2machineId map, and Writers array
|
|
|
for _, v := range machine.Volumes { |
|
|
for _, v := range machine.Volumes { |
|
|
log.Println("Setting volume", v.Id, "to", machine.Server) |
|
|
log.Println("Setting volume", v.Id, "to", machine.Server) |
|
|
m.vid2machineId[v.Id] = machineId |
|
|
m.vid2machineId[v.Id] = machineId |
|
|
if v.Size < ChunkSizeLimit { |
|
|
if v.Size < ChunkSizeLimit { |
|
|
m.writers = append(m.writers, machineId) |
|
|
|
|
|
|
|
|
m.Writers = append(m.Writers, machineId) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
//setting writers, copy-on-write because of possible updating
|
|
|
|
|
|
var writers []int |
|
|
|
|
|
|
|
|
//setting Writers, copy-on-write because of possible updating
|
|
|
|
|
|
var Writers []int |
|
|
for machine_index, machine_entry := range m.Machines { |
|
|
for machine_index, machine_entry := range m.Machines { |
|
|
for _, v := range machine_entry.Volumes { |
|
|
for _, v := range machine_entry.Volumes { |
|
|
if v.Size < ChunkSizeLimit { |
|
|
if v.Size < ChunkSizeLimit { |
|
|
writers = append(writers, machine_index) |
|
|
|
|
|
|
|
|
Writers = append(Writers, machine_index) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
m.writers = writers |
|
|
|
|
|
|
|
|
m.Writers = Writers |
|
|
|
|
|
|
|
|
log.Println("Machines:", len(m.Machines), "Volumes:", len(m.vid2machineId), "Writable:", len(m.writers)) |
|
|
|
|
|
|
|
|
log.Println("Machines:", len(m.Machines), "Volumes:", len(m.vid2machineId), "Writable:", len(m.Writers)) |
|
|
return *vids |
|
|
return *vids |
|
|
} |
|
|
} |
|
|
func (m *Mapper) Save() { |
|
|
func (m *Mapper) Save() { |
|
|