diff --git a/weed-fs/src/cmd/weedc.go b/weed-fs/src/cmd/weedc.go index a78d36c69..29c0f1500 100644 --- a/weed-fs/src/cmd/weedc.go +++ b/weed-fs/src/cmd/weedc.go @@ -10,15 +10,17 @@ import ( "mime" "strconv" "strings" + "time" ) var ( port = flag.Int("port", 8080, "http listen port") chunkFolder = flag.String("dir", "/tmp", "data directory to store files") - chunkCount = flag.Int("chunks", 5, "data chunks to store files") + volumes = flag.String("volumes", "0,1-3,4", "comma-separated list of volume ids or range of ids") publicServer = flag.String("pserver", "localhost:8080", "public server to serve data read") metaServer = flag.String("mserver", "localhost:9333", "metadata server to store mappings") IsDebug = flag.Bool("debug", false, "enable debug mode") + pulse = flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") store *storage.Store ) @@ -38,8 +40,8 @@ func storeHandler(w http.ResponseWriter, r *http.Request) { } func GetHandler(w http.ResponseWriter, r *http.Request) { n := new(storage.Needle) - vid, fid, ext := parseURLPath(r.URL.Path) - volumeId, _ := strconv.Atoui64(vid) + vid, fid, ext := parseURLPath(r.URL.Path) + volumeId, _ := strconv.Atoui64(vid) n.ParsePath(fid) if *IsDebug { @@ -54,14 +56,14 @@ func GetHandler(w http.ResponseWriter, r *http.Request) { log.Println("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) return } - if ext!="" { + if ext != "" { w.Header().Set("Content-Type", mime.TypeByExtension(ext)) } w.Write(n.Data) } func PostHandler(w http.ResponseWriter, r *http.Request) { - vid, _, _ := parseURLPath(r.URL.Path) - volumeId, e := strconv.Atoui64(vid) + vid, _, _ := parseURLPath(r.URL.Path) + volumeId, e := strconv.Atoui64(vid) if e != nil { writeJson(w, r, e) } else { @@ -73,19 +75,19 @@ func PostHandler(w http.ResponseWriter, r *http.Request) { } func DeleteHandler(w http.ResponseWriter, r *http.Request) { n := new(storage.Needle) - vid, fid, _ := parseURLPath(r.URL.Path) - volumeId, _ := strconv.Atoui64(vid) - n.ParsePath(fid) + vid, fid, _ := parseURLPath(r.URL.Path) + volumeId, _ := strconv.Atoui64(vid) + n.ParsePath(fid) - cookie := n.Cookie - count, _ := store.Read(volumeId, n) + cookie := n.Cookie + count, _ := store.Read(volumeId, n) - if n.Cookie != cookie { - log.Println("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) - return - } - - n.Size = 0 + if n.Cookie != cookie { + log.Println("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) + return + } + + n.Size = 0 store.Write(volumeId, n) m := make(map[string]uint32) m["size"] = uint32(count) @@ -105,33 +107,38 @@ func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) { } //log.Println("JSON Response", string(bytes)) } -func parseURLPath(path string)(vid, fid, ext string){ - sepIndex := strings.LastIndex(path, "/") - commaIndex := strings.LastIndex(path[sepIndex:], ",") - dotIndex := strings.LastIndex(path[sepIndex:], ".") - vid = path[sepIndex+1 : commaIndex] - fid = path[commaIndex+1:] - ext = "" - if dotIndex > 0 { - fid = path[commaIndex+1 : dotIndex] - ext = path[dotIndex+1:] - } - if commaIndex <= 0 { - log.Println("unknown file id", path[sepIndex+1:commaIndex]) - return - } - return +func parseURLPath(path string) (vid, fid, ext string) { + sepIndex := strings.LastIndex(path, "/") + commaIndex := strings.LastIndex(path[sepIndex:], ",") + dotIndex := strings.LastIndex(path[sepIndex:], ".") + vid = path[sepIndex+1 : commaIndex] + fid = path[commaIndex+1:] + ext = "" + if dotIndex > 0 { + fid = path[commaIndex+1 : dotIndex] + ext = path[dotIndex+1:] + } + if commaIndex <= 0 { + log.Println("unknown file id", path[sepIndex+1:commaIndex]) + return + } + return } func main() { flag.Parse() //TODO: now default to 1G, this value should come from server? - store = storage.NewStore(*port, *publicServer, *chunkFolder, 1024*1024*1024, *chunkCount) + store = storage.NewStore(*port, *publicServer, *chunkFolder, *volumes) defer store.Close() http.HandleFunc("/", storeHandler) http.HandleFunc("/status", statusHandler) - store.Join(*metaServer) + go func() { + for { + store.Join(*metaServer) + time.Sleep(int64(*pulse) * 1e9) + } + }() log.Println("store joined at", *metaServer) log.Println("Start storage service at http://127.0.0.1:" + strconv.Itoa(*port)) diff --git a/weed-fs/src/cmd/weeds.go b/weed-fs/src/cmd/weeds.go index 396f5f271..f80b23f6f 100644 --- a/weed-fs/src/cmd/weeds.go +++ b/weed-fs/src/cmd/weeds.go @@ -17,6 +17,7 @@ var ( metaFolder = flag.String("mdir", "/tmp", "data directory to store mappings") capacity = flag.Int("capacity", 100, "maximum number of volumes to hold") mapper *directory.Mapper + IsDebug = flag.Bool("debug", false, "verbose debug information") ) func dirLookupHandler(w http.ResponseWriter, r *http.Request) { @@ -42,10 +43,8 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) { publicServer := r.FormValue("publicServer") volumes := new([]storage.VolumeInfo) json.Unmarshal([]byte(r.FormValue("volumes")), volumes) - capacity, _ := strconv.Atoi(r.FormValue("capacity")) - log.Println("Recieved joining request from remote address", s, "capacity=", capacity, "volumes", r.FormValue("volumes")) - vids := mapper.Add(*directory.NewMachine(s, publicServer, *volumes, capacity)) - writeJson(w, r, vids) + log.Println("Recieved updates from", s, "volumes", r.FormValue("volumes")) + mapper.Add(*directory.NewMachine(s, publicServer, *volumes)) } func dirStatusHandler(w http.ResponseWriter, r *http.Request) { writeJson(w, r, mapper) @@ -67,8 +66,7 @@ func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) { func main() { flag.Parse() - mapper = directory.NewMapper(*metaFolder, "directory", *capacity) - defer mapper.Save() + mapper = directory.NewMapper(*metaFolder, "directory") http.HandleFunc("/dir/assign", dirAssignHandler) http.HandleFunc("/dir/lookup", dirLookupHandler) http.HandleFunc("/dir/write", dirWriteHandler) diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go index 3c10e1a78..a4cce21df 100644 --- a/weed-fs/src/pkg/directory/volume_mapping.go +++ b/weed-fs/src/pkg/directory/volume_mapping.go @@ -22,65 +22,40 @@ type MachineInfo struct { type Machine struct { Server MachineInfo Volumes []storage.VolumeInfo - Capacity int } type Mapper struct { dir string fileName string - capacity int lock sync.Mutex Machines []*Machine vid2machineId map[uint32]int Writers []int // transient array of Writers volume id - GlobalVolumeSequence uint32 - FileIdSequence uint64 fileIdCounter uint64 } -func NewMachine(server, publicServer string, volumes []storage.VolumeInfo, capacity int) *Machine { - return &Machine{Server: MachineInfo{Url: server, PublicUrl: publicServer}, Volumes: volumes, Capacity: capacity} +func NewMachine(server, publicServer string, volumes []storage.VolumeInfo) *Machine { + return &Machine{Server: MachineInfo{Url: server, PublicUrl: publicServer}, Volumes: volumes} } -func NewMapper(dirname string, filename string, capacity int) (m *Mapper) { - m = &Mapper{dir: dirname, fileName: filename, capacity: capacity} - log.Println("Loading volume id to maching mapping:", path.Join(m.dir, m.fileName+".map")) - dataFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".map"), os.O_RDONLY, 0644) +func NewMapper(dirname string, filename string) (m *Mapper) { + m = &Mapper{dir: dirname, fileName: filename} m.vid2machineId = make(map[uint32]int) m.Writers = *new([]int) - if e != nil { - log.Println("Mapping File Read", e) - m.Machines = *new([]*Machine) - } else { - decoder := gob.NewDecoder(dataFile) - defer dataFile.Close() - decoder.Decode(&m.Machines) - decoder.Decode(&m.GlobalVolumeSequence) - - //add to vid2machineId map, and Writers array - for machine_index, machine := range m.Machines { - for _, v := range machine.Volumes { - m.vid2machineId[v.Id] = machine_index - if v.Size < ChunkSizeLimit { - m.Writers = append(m.Writers, machine_index) - } - } - } - log.Println("Loaded mapping size", len(m.Machines)) - } + m.Machines = *new([]*Machine) seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644) if se != nil { m.FileIdSequence = FileIdSaveInterval - log.Println("Setting file id sequence", m.FileIdSequence) + log.Println("Setting file id sequence", m.FileIdSequence) } else { decoder := gob.NewDecoder(seqFile) defer seqFile.Close() decoder.Decode(&m.FileIdSequence) - log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence + FileIdSaveInterval) + log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval) //in case the server stops between intervals m.FileIdSequence += FileIdSaveInterval } @@ -89,22 +64,20 @@ func NewMapper(dirname string, filename string, capacity int) (m *Mapper) { func (m *Mapper) PickForWrite() (string, MachineInfo) { machine := m.Machines[m.Writers[rand.Intn(len(m.Writers))]] vid := machine.Volumes[rand.Intn(len(machine.Volumes))].Id - return NewFileId(vid,m.NextFileId(),rand.Uint32()).String(), machine.Server + return NewFileId(vid, m.NextFileId(), rand.Uint32()).String(), machine.Server } func (m *Mapper) NextFileId() uint64 { - if m.fileIdCounter <= 0 { - m.fileIdCounter = FileIdSaveInterval - m.saveSequence() - } + if m.fileIdCounter <= 0 { + m.fileIdCounter = FileIdSaveInterval + m.saveSequence() + } m.fileIdCounter-- return m.FileIdSequence - m.fileIdCounter } func (m *Mapper) Get(vid uint32) *Machine { return m.Machines[m.vid2machineId[vid]] } -func (m *Mapper) Add(machine Machine) []uint32 { - log.Println("Adding existing", machine.Server.Url, len(machine.Volumes), "volumes to dir", len(m.Machines)) - log.Println("Adding new ", machine.Server.Url, machine.Capacity-len(machine.Volumes), "volumes to dir", len(m.Machines)) +func (m *Mapper) Add(machine Machine){ //check existing machine, linearly m.lock.Lock() foundExistingMachineId := -1 @@ -119,24 +92,11 @@ func (m *Mapper) Add(machine Machine) []uint32 { machineId = len(m.Machines) m.Machines = append(m.Machines, &machine) } - - //generate new volumes - vids := new([]uint32) - for vid, i := m.GlobalVolumeSequence, len(machine.Volumes); i < machine.Capacity; i, vid = i+1, vid+1 { - newVolume := storage.VolumeInfo{Id: vid, Size: 0} - machine.Volumes = append(machine.Volumes, newVolume) - m.vid2machineId[vid] = machineId - log.Println("Adding volume", vid, "from", machine.Server.Url) - *vids = append(*vids, vid) - m.GlobalVolumeSequence = vid + 1 - } - - m.Save() m.lock.Unlock() //add to vid2machineId map, and Writers array for _, v := range machine.Volumes { - log.Println("Setting volume", v.Id, "to", machine.Server.Url) + //log.Println("Setting volume", v.Id, "to", machine.Server.Url) m.vid2machineId[v.Id] = machineId if v.Size < ChunkSizeLimit { m.Writers = append(m.Writers, machineId) @@ -152,20 +112,6 @@ func (m *Mapper) Add(machine Machine) []uint32 { } } m.Writers = Writers - - log.Println("Machines:", len(m.Machines), "Volumes:", len(m.vid2machineId), "Writable:", len(m.Writers)) - return *vids -} -func (m *Mapper) Save() { - log.Println("Saving virtual to physical:", path.Join(m.dir, m.fileName+".map")) - dataFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".map"), os.O_CREATE|os.O_WRONLY, 0644) - if e != nil { - log.Fatalf("Mapping File Save [ERROR] %s\n", e) - } - defer dataFile.Close() - encoder := gob.NewEncoder(dataFile) - encoder.Encode(m.Machines) - encoder.Encode(m.GlobalVolumeSequence) } func (m *Mapper) saveSequence() { log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq")) diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index 14f69bf1d..f4f71e541 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -2,7 +2,6 @@ package storage import ( "log" - "io/ioutil" "json" "os" "strings" @@ -13,7 +12,6 @@ import ( type Store struct { volumes map[uint64]*Volume - capacity int dir string Port int PublicServer string @@ -23,35 +21,48 @@ type VolumeInfo struct { Size int64 } -func NewStore(port int, publicServer, dirname string, chunkSize, capacity int) (s *Store) { - s = &Store{Port: port, PublicServer: publicServer, dir: dirname, capacity: capacity} +func NewStore(port int, publicServer, dirname string, volumeListString string) (s *Store) { + s = &Store{Port: port, PublicServer: publicServer, dir: dirname} s.volumes = make(map[uint64]*Volume) - - files, _ := ioutil.ReadDir(dirname) - for _, f := range files { - if f.IsDirectory() || !strings.HasSuffix(f.Name, ".dat") { - continue - } - id, err := strconv.Atoui64(f.Name[0:(strings.LastIndex(f.Name, ".dat"))]) - log.Println("Loading data file name:", f.Name) - if err != nil { - continue + + for _, range_string := range strings.Split(volumeListString, ",") { + if strings.Index(range_string, "-") < 0 { + id_string := range_string + id, err := strconv.Atoui64(id_string) + if err != nil { + log.Println("Volume Id", id_string, "is not a valid unsigned integer! Skipping it...") + continue + } + s.volumes[id] = NewVolume(s.dir, uint32(id)) + } else { + pair := strings.Split(range_string, "-") + start, start_err := strconv.Atoui64(pair[0]) + if start_err != nil { + log.Println("Volume Id", pair[0], "is not a valid unsigned integer! Skipping it...") + continue + } + end, end_err := strconv.Atoui64(pair[1]) + if end_err != nil { + log.Println("Volume Id", pair[1], "is not a valid unsigned integer! Skipping it...") + continue + } + for id := start; id<=end; id++ { + s.volumes[id] = NewVolume(s.dir, uint32(id)) + } } - s.volumes[id] = NewVolume(s.dir, uint32(id)) } log.Println("Store started on dir:", dirname, "with", len(s.volumes), "existing volumes") - log.Println("Expected capacity=", s.capacity, "volumes") return } -func (s *Store) Status()(*[]*VolumeInfo){ - stats := new([]*VolumeInfo) - for k, v := range s.volumes { - s := new(VolumeInfo) - s.Id, s.Size = uint32(k), v.Size() - *stats = append(*stats, s) - } - return stats +func (s *Store) Status() *[]*VolumeInfo { + stats := new([]*VolumeInfo) + for k, v := range s.volumes { + s := new(VolumeInfo) + s.Id, s.Size = uint32(k), v.Size() + *stats = append(*stats, s) + } + return stats } func (s *Store) Join(mserver string) { stats := new([]*VolumeInfo) @@ -65,29 +76,17 @@ func (s *Store) Join(mserver string) { values.Add("port", strconv.Itoa(s.Port)) values.Add("publicServer", s.PublicServer) values.Add("volumes", string(bytes)) - log.Println("Registering exiting volumes", string(bytes)) - values.Add("capacity", strconv.Itoa(s.capacity)) - retString := util.Post("http://"+mserver+"/dir/join", values) - if retString != nil { - newVids := new([]int) - log.Println("Instructed to create volume", string(retString)) - e := json.Unmarshal(retString, newVids) - if e == nil { - for _, vid := range *newVids { - s.volumes[uint64(vid)] = NewVolume(s.dir, uint32(vid)) - log.Println("Adding volume", vid) - } - } - } + log.Println("Exiting volumes", string(bytes)) + util.Post("http://"+mserver+"/dir/join", values) } func (s *Store) Close() { for _, v := range s.volumes { v.Close() } } -func (s *Store) Write(i uint64, n *Needle) (uint32){ +func (s *Store) Write(i uint64, n *Needle) uint32 { return s.volumes[i].write(n) } -func (s *Store) Read(i uint64, n *Needle) (int, os.Error){ +func (s *Store) Read(i uint64, n *Needle) (int, os.Error) { return s.volumes[i].read(n) }