|
|
@ -11,7 +11,8 @@ import ( |
|
|
|
) |
|
|
|
|
|
|
|
const ( |
|
|
|
ChunkSizeLimit = 1 * 1024 * 1024 * 1024 //1G, can not be more than max(uint32)*8
|
|
|
|
ChunkSizeLimit = 1 * 1000 * 1000 * 1000 //1G, can not be more than max(uint32)*8
|
|
|
|
FileIdSaveInterval = 10000 |
|
|
|
) |
|
|
|
|
|
|
|
type MachineInfo struct { |
|
|
@ -35,14 +36,17 @@ type Mapper struct { |
|
|
|
Writers []int // transient array of Writers volume id
|
|
|
|
|
|
|
|
GlobalVolumeSequence uint64 |
|
|
|
|
|
|
|
FileIdSequence uint64 |
|
|
|
fileIdCounter uint64 |
|
|
|
} |
|
|
|
|
|
|
|
func NewMachine(server, publicServer string, volumes []storage.VolumeInfo, capacity int) *Machine { |
|
|
|
return &Machine{Server:MachineInfo{Url:server,PublicUrl:publicServer},Volumes:volumes,Capacity:capacity} |
|
|
|
return &Machine{Server: MachineInfo{Url: server, PublicUrl: publicServer}, Volumes: volumes, Capacity: capacity} |
|
|
|
} |
|
|
|
|
|
|
|
func NewMapper(dirname string, filename string, capacity int) (m *Mapper) { |
|
|
|
m = &Mapper{dir:dirname,fileName:filename,capacity:capacity} |
|
|
|
m = &Mapper{dir: dirname, fileName: filename, capacity: capacity} |
|
|
|
log.Println("Loading volume id to maching mapping:", path.Join(m.dir, m.fileName+".map")) |
|
|
|
dataFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".map"), os.O_RDONLY, 0644) |
|
|
|
m.vid2machineId = make(map[uint64]int) |
|
|
@ -67,11 +71,33 @@ func NewMapper(dirname string, filename string, capacity int) (m *Mapper) { |
|
|
|
} |
|
|
|
log.Println("Loaded mapping size", len(m.Machines)) |
|
|
|
} |
|
|
|
|
|
|
|
seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644) |
|
|
|
if se != nil { |
|
|
|
m.FileIdSequence = FileIdSaveInterval |
|
|
|
log.Println("Setting file id sequence", m.FileIdSequence) |
|
|
|
} else { |
|
|
|
decoder := gob.NewDecoder(seqFile) |
|
|
|
defer seqFile.Close() |
|
|
|
decoder.Decode(&m.FileIdSequence) |
|
|
|
log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence + FileIdSaveInterval) |
|
|
|
//in case the server stops between intervals
|
|
|
|
m.FileIdSequence += FileIdSaveInterval |
|
|
|
} |
|
|
|
return |
|
|
|
} |
|
|
|
func (m *Mapper) PickForWrite() MachineInfo { |
|
|
|
vid := rand.Intn(len(m.Writers)) |
|
|
|
return m.Machines[m.Writers[vid]].Server |
|
|
|
func (m *Mapper) PickForWrite() (vid uint64, server MachineInfo) { |
|
|
|
machine := m.Machines[m.Writers[rand.Intn(len(m.Writers))]] |
|
|
|
vid = machine.Volumes[rand.Intn(len(machine.Volumes))].Id |
|
|
|
return vid, machine.Server |
|
|
|
} |
|
|
|
func (m *Mapper) NextFileId() uint64 { |
|
|
|
if m.fileIdCounter <= 0 { |
|
|
|
m.fileIdCounter = FileIdSaveInterval |
|
|
|
m.saveSequence() |
|
|
|
} |
|
|
|
m.fileIdCounter-- |
|
|
|
return m.FileIdSequence - m.fileIdCounter |
|
|
|
} |
|
|
|
func (m *Mapper) Get(vid uint64) *Machine { |
|
|
|
return m.Machines[m.vid2machineId[vid]] |
|
|
@ -141,3 +167,13 @@ func (m *Mapper) Save() { |
|
|
|
encoder.Encode(m.Machines) |
|
|
|
encoder.Encode(m.GlobalVolumeSequence) |
|
|
|
} |
|
|
|
func (m *Mapper) saveSequence() { |
|
|
|
log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq")) |
|
|
|
seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644) |
|
|
|
if e != nil { |
|
|
|
log.Fatalf("Sequence File Save [ERROR] %s\n", e) |
|
|
|
} |
|
|
|
defer seqFile.Close() |
|
|
|
encoder := gob.NewEncoder(seqFile) |
|
|
|
encoder.Encode(m.FileIdSequence) |
|
|
|
} |