diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go index 45261f017..3bafccd14 100644 --- a/weed-fs/src/cmd/weed/volume.go +++ b/weed-fs/src/cmd/weed/volume.go @@ -1,177 +1,176 @@ package main import ( - "log" - "math/rand" - "mime" - "net/http" - "pkg/storage" - "strconv" - "strings" - "time" + "log" + "math/rand" + "mime" + "net/http" + "pkg/storage" + "strconv" + "strings" + "time" ) func init() { - cmdVolume.Run = runVolume // break init cycle - IsDebug = cmdVolume.Flag.Bool("debug", false, "enable debug mode") - port = cmdVolume.Flag.Int("port", 8080, "http listen port") + cmdVolume.Run = runVolume // break init cycle + IsDebug = cmdVolume.Flag.Bool("debug", false, "enable debug mode") + port = cmdVolume.Flag.Int("port", 8080, "http listen port") } var cmdVolume = &Command{ - UsageLine: "volume -port=8080 -dir=/tmp -volumes=0-99 -publicUrl=server_name:8080 -mserver=localhost:9333", - Short: "start a volume server", - Long: `start a volume server to provide storage spaces + UsageLine: "volume -port=8080 -dir=/tmp -volumes=0-99 -publicUrl=server_name:8080 -mserver=localhost:9333", + Short: "start a volume server", + Long: `start a volume server to provide storage spaces `, } var ( - chunkFolder = cmdVolume.Flag.String("dir", "/tmp", "data directory to store files") - volumes = cmdVolume.Flag.String("volumes", "0,1-3,4", "comma-separated list of volume ids or range of ids") - publicUrl = cmdVolume.Flag.String("publicUrl", "localhost:8080", "public url to serve data read") - metaServer = cmdVolume.Flag.String("mserver", "localhost:9333", "master directory server to store mappings") - pulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") - - store *storage.Store + chunkFolder = cmdVolume.Flag.String("dir", "/tmp", "data directory to store files") + volumes = cmdVolume.Flag.String("volumes", "0,1-3,4", "comma-separated list of volume ids or range of ids") + publicUrl = cmdVolume.Flag.String("publicUrl", "localhost:8080", "public url to serve data read") + metaServer = cmdVolume.Flag.String("mserver", "localhost:9333", "master directory server to store mappings") + pulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") + store *storage.Store ) func statusHandler(w http.ResponseWriter, r *http.Request) { - writeJson(w, r, store.Status()) + writeJson(w, r, store.Status()) } func addVolumeHandler(w http.ResponseWriter, r *http.Request) { - store.AddVolume(r.FormValue("volume")) - writeJson(w, r, store.Status()) + store.AddVolume(r.FormValue("volume")) + writeJson(w, r, store.Status()) } func storeHandler(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case "GET": - GetHandler(w, r) - case "DELETE": - DeleteHandler(w, r) - case "POST": - PostHandler(w, r) - } + switch r.Method { + case "GET": + GetHandler(w, r) + case "DELETE": + DeleteHandler(w, r) + case "POST": + PostHandler(w, r) + } } func GetHandler(w http.ResponseWriter, r *http.Request) { - n := new(storage.Needle) - vid, fid, ext := parseURLPath(r.URL.Path) - volumeId, _ := strconv.ParseUint(vid, 10, 64) - n.ParsePath(fid) - - if *IsDebug { - log.Println("volume", volumeId, "reading", n) - } - cookie := n.Cookie - count, e := store.Read(volumeId, n) - if *IsDebug { - log.Println("read bytes", count, "error", e) - } - if n.Cookie != cookie { - log.Println("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) - return - } - if ext != "" { - mtype := mime.TypeByExtension(ext) - w.Header().Set("Content-Type", mtype) - if storage.IsCompressable(ext, mtype){ - if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip"){ - w.Header().Set("Content-Encoding", "gzip") - }else{ - n.Data = storage.UnGzipData(n.Data) - } - } - } - w.Write(n.Data) + n := new(storage.Needle) + vid, fid, ext := parseURLPath(r.URL.Path) + volumeId, _ := storage.NewVolumeId(vid) + n.ParsePath(fid) + + if *IsDebug { + log.Println("volume", volumeId, "reading", n) + } + cookie := n.Cookie + count, e := store.Read(volumeId, n) + if *IsDebug { + log.Println("read bytes", count, "error", e) + } + if n.Cookie != cookie { + log.Println("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) + return + } + if ext != "" { + mtype := mime.TypeByExtension(ext) + w.Header().Set("Content-Type", mtype) + if storage.IsCompressable(ext, mtype) { + if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { + w.Header().Set("Content-Encoding", "gzip") + } else { + n.Data = storage.UnGzipData(n.Data) + } + } + } + w.Write(n.Data) } func PostHandler(w http.ResponseWriter, r *http.Request) { - vid, _, _ := parseURLPath(r.URL.Path) - volumeId, e := strconv.ParseUint(vid, 10, 64) - if e != nil { - writeJson(w, r, e) - } else { - needle, ne := storage.NewNeedle(r) - if ne != nil { - writeJson(w, r, ne) - } else { - ret := store.Write(volumeId, needle) - m := make(map[string]uint32) - m["size"] = ret - writeJson(w, r, m) - } - } + vid, _, _ := parseURLPath(r.URL.Path) + volumeId, e := storage.NewVolumeId(vid) + if e != nil { + writeJson(w, r, e) + } else { + needle, ne := storage.NewNeedle(r) + if ne != nil { + writeJson(w, r, ne) + } else { + ret := store.Write(volumeId, needle) + m := make(map[string]uint32) + m["size"] = ret + writeJson(w, r, m) + } + } } func DeleteHandler(w http.ResponseWriter, r *http.Request) { - n := new(storage.Needle) - vid, fid, _ := parseURLPath(r.URL.Path) - volumeId, _ := strconv.ParseUint(vid, 10, 64) - n.ParsePath(fid) - - if *IsDebug { - log.Println("deleting", n) - } - - cookie := n.Cookie - count, ok := store.Read(volumeId, n) - - if ok != nil { - m := make(map[string]uint32) - m["size"] = 0 - writeJson(w, r, m) - return - } - - if n.Cookie != cookie { - log.Println("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) - return - } - - n.Size = 0 - store.Delete(volumeId, n) - m := make(map[string]uint32) - m["size"] = uint32(count) - writeJson(w, r, m) + n := new(storage.Needle) + vid, fid, _ := parseURLPath(r.URL.Path) + volumeId, _ := storage.NewVolumeId(vid) + n.ParsePath(fid) + + if *IsDebug { + log.Println("deleting", n) + } + + cookie := n.Cookie + count, ok := store.Read(volumeId, n) + + if ok != nil { + m := make(map[string]uint32) + m["size"] = 0 + writeJson(w, r, m) + return + } + + if n.Cookie != cookie { + log.Println("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) + return + } + + n.Size = 0 + store.Delete(volumeId, n) + m := make(map[string]uint32) + m["size"] = uint32(count) + writeJson(w, r, m) } func parseURLPath(path string) (vid, fid, ext string) { - sepIndex := strings.LastIndex(path, "/") - commaIndex := strings.LastIndex(path[sepIndex:], ",") - if commaIndex <= 0 { - if "favicon.ico" != path[sepIndex+1:] { - log.Println("unknown file id", path[sepIndex+1:]) - } - return - } - dotIndex := strings.LastIndex(path[sepIndex:], ".") - vid = path[sepIndex+1 : commaIndex] - fid = path[commaIndex+1:] - ext = "" - if dotIndex > 0 { - fid = path[commaIndex+1 : dotIndex] - ext = path[dotIndex:] - } - return + sepIndex := strings.LastIndex(path, "/") + commaIndex := strings.LastIndex(path[sepIndex:], ",") + if commaIndex <= 0 { + if "favicon.ico" != path[sepIndex+1:] { + log.Println("unknown file id", path[sepIndex+1:]) + } + return + } + dotIndex := strings.LastIndex(path[sepIndex:], ".") + vid = path[sepIndex+1 : commaIndex] + fid = path[commaIndex+1:] + ext = "" + if dotIndex > 0 { + fid = path[commaIndex+1 : dotIndex] + ext = path[dotIndex:] + } + return } func runVolume(cmd *Command, args []string) bool { - //TODO: now default to 1G, this value should come from server? - store = storage.NewStore(*port, *publicUrl, *chunkFolder, *volumes) - defer store.Close() - http.HandleFunc("/", storeHandler) - http.HandleFunc("/status", statusHandler) - http.HandleFunc("/add_volume", addVolumeHandler) - - go func() { - for { - store.Join(*metaServer) - time.Sleep(time.Duration(float32(*pulse*1e3)*(1+rand.Float32())) * time.Millisecond) - } - }() - log.Println("store joined at", *metaServer) - - log.Println("Start storage service at http://127.0.0.1:"+strconv.Itoa(*port), "public url", *publicUrl) - e := http.ListenAndServe(":"+strconv.Itoa(*port), nil) - if e != nil { - log.Fatalf("Fail to start:", e) - } - return true + //TODO: now default to 1G, this value should come from server? + store = storage.NewStore(*port, *publicUrl, *chunkFolder, *volumes) + defer store.Close() + http.HandleFunc("/", storeHandler) + http.HandleFunc("/status", statusHandler) + http.HandleFunc("/add_volume", addVolumeHandler) + + go func() { + for { + store.Join(*metaServer) + time.Sleep(time.Duration(float32(*pulse*1e3)*(1+rand.Float32())) * time.Millisecond) + } + }() + log.Println("store joined at", *metaServer) + + log.Println("Start storage service at http://127.0.0.1:"+strconv.Itoa(*port), "public url", *publicUrl) + e := http.ListenAndServe(":"+strconv.Itoa(*port), nil) + if e != nil { + log.Fatalf("Fail to start:", e) + } + return true } diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index 5a686d2db..cd32a0ccf 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -12,7 +12,7 @@ import ( ) type Store struct { - volumes map[uint64]*Volume + volumes map[VolumeId]*Volume dir string Port int PublicUrl string @@ -20,7 +20,7 @@ type Store struct { func NewStore(port int, publicUrl, dirname string, volumeListString string) (s *Store) { s = &Store{Port: port, PublicUrl: publicUrl, dir: dirname} - s.volumes = make(map[uint64]*Volume) + s.volumes = make(map[VolumeId]*Volume) s.AddVolume(volumeListString) @@ -35,7 +35,7 @@ func (s *Store) AddVolume(volumeListString string) error { if err != nil { return errors.New("Volume Id " + id_string + " is not a valid unsigned integer!") } - s.addVolume(id) + s.addVolume(VolumeId(id)) } else { pair := strings.Split(range_string, "-") start, start_err := strconv.ParseUint(pair[0], 10, 64) @@ -47,17 +47,17 @@ func (s *Store) AddVolume(volumeListString string) error { return errors.New("Volume End Id" + pair[1] + " is not a valid unsigned integer!") } for id := start; id <= end; id++ { - s.addVolume(id) + s.addVolume(VolumeId(id)) } } } return nil } -func (s *Store) addVolume(vid uint64) error { +func (s *Store) addVolume(vid VolumeId) error { if s.volumes[vid] != nil { - return errors.New("Volume Id " + strconv.FormatUint(vid, 10) + " already exists!") + return errors.New("Volume Id " + vid.String() + " already exists!") } - s.volumes[vid] = NewVolume(s.dir, uint32(vid)) + s.volumes[vid] = NewVolume(s.dir, vid) return nil } func (s *Store) Status() *[]*topology.VolumeInfo { @@ -88,12 +88,12 @@ func (s *Store) Close() { v.Close() } } -func (s *Store) Write(i uint64, n *Needle) uint32 { +func (s *Store) Write(i VolumeId, n *Needle) uint32 { return s.volumes[i].write(n) } -func (s *Store) Delete(i uint64, n *Needle) uint32 { +func (s *Store) Delete(i VolumeId, n *Needle) uint32 { return s.volumes[i].delete(n) } -func (s *Store) Read(i uint64, n *Needle) (int, error) { +func (s *Store) Read(i VolumeId, n *Needle) (int, error) { return s.volumes[i].read(n) } diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index 01d93efb7..40e3eaaf4 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -13,8 +13,16 @@ const ( SuperBlockSize = 8 ) +type VolumeId uint32 +func NewVolumeId(vid string) (VolumeId,error) { + volumeId, err := strconv.ParseUint(vid, 10, 64) + return VolumeId(volumeId), err +} +func (vid *VolumeId) String() string{ + return strconv.FormatUint(uint64(*vid), 10) +} type Volume struct { - Id uint32 + Id VolumeId dir string dataFile *os.File nm *NeedleMap @@ -22,7 +30,7 @@ type Volume struct { accessLock sync.Mutex } -func NewVolume(dirname string, id uint32) (v *Volume) { +func NewVolume(dirname string, id VolumeId) (v *Volume) { var e error v = &Volume{dir: dirname, Id: id} fileName := strconv.FormatUint(uint64(v.Id), 10)