From aab5390cb5104bf781413ade5e6f76e4f3fa802e Mon Sep 17 00:00:00 2001 From: "chris.lu@gmail.com" Date: Tue, 7 Aug 2012 08:31:48 +0000 Subject: [PATCH] all switching to "weed command [args]" usage mode git-svn-id: https://weed-fs.googlecode.com/svn/trunk@66 282b0af5-e82d-9cf1-ede4-77906d7719d0 --- weed-fs/src/cmd/weedmaster/weedmaster.go | 91 --------- .../fix_volume_index/fix_volume_index.go | 59 ------ weed-fs/src/cmd/weedvolume/weedvolume.go | 182 ------------------ 3 files changed, 332 deletions(-) delete mode 100644 weed-fs/src/cmd/weedmaster/weedmaster.go delete mode 100644 weed-fs/src/cmd/weedvolume/fix_volume_index/fix_volume_index.go delete mode 100644 weed-fs/src/cmd/weedvolume/weedvolume.go diff --git a/weed-fs/src/cmd/weedmaster/weedmaster.go b/weed-fs/src/cmd/weedmaster/weedmaster.go deleted file mode 100644 index 3f6b4daa2..000000000 --- a/weed-fs/src/cmd/weedmaster/weedmaster.go +++ /dev/null @@ -1,91 +0,0 @@ -package main - -import ( - "pkg/directory" - "encoding/json" - "flag" - "fmt" - "log" - "net/http" - "pkg/storage" - "strconv" - "strings" -) - -var ( - port = flag.Int("port", 9333, "http listen port") - metaFolder = flag.String("mdir", "/tmp", "data directory to store mappings") - capacity = flag.Int("capacity", 100, "maximum number of volumes to hold") - mapper *directory.Mapper - IsDebug = flag.Bool("debug", false, "verbose debug information") - volumeSizeLimitMB = flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes") -) - -func dirLookupHandler(w http.ResponseWriter, r *http.Request) { - vid := r.FormValue("volumeId") - commaSep := strings.Index(vid, ",") - if commaSep > 0 { - vid = vid[0:commaSep] - } - volumeId, _ := strconv.ParseUint(vid, 10, 64) - machine, e := mapper.Get(uint32(volumeId)) - if e == nil { - writeJson(w, r, machine.Server) - } else { - log.Println("Invalid volume id", volumeId) - writeJson(w, r, map[string]string{"error": "volume id " + strconv.FormatUint(volumeId, 10) + " not found"}) - } -} -func dirAssignHandler(w http.ResponseWriter, r *http.Request) { - c:=r.FormValue("count") - fid, count, machine, err := mapper.PickForWrite(c) - if err == nil { - writeJson(w, r, map[string]string{"fid": fid, "url": machine.Url, "publicUrl":machine.PublicUrl, "count":strconv.Itoa(count)}) - } else { - log.Println(err) - writeJson(w, r, map[string]string{"error": err.Error()}) - } -} -func dirJoinHandler(w http.ResponseWriter, r *http.Request) { - s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") - publicUrl := r.FormValue("publicUrl") - volumes := new([]storage.VolumeInfo) - json.Unmarshal([]byte(r.FormValue("volumes")), volumes) - if *IsDebug { - log.Println(s, "volumes", r.FormValue("volumes")) - } - mapper.Add(*directory.NewMachine(s, publicUrl, *volumes)) -} -func dirStatusHandler(w http.ResponseWriter, r *http.Request) { - writeJson(w, r, mapper) -} -func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) { - w.Header().Set("Content-Type", "application/javascript") - bytes, _ := json.Marshal(obj) - callback := r.FormValue("callback") - if callback == "" { - w.Write(bytes) - } else { - w.Write([]uint8(callback)) - w.Write([]uint8("(")) - fmt.Fprint(w, string(bytes)) - w.Write([]uint8(")")) - } -} - -func main() { - flag.Parse() - log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB") - mapper = directory.NewMapper(*metaFolder, "directory", uint64(*volumeSizeLimitMB)*1024*1024) - http.HandleFunc("/dir/assign", dirAssignHandler) - http.HandleFunc("/dir/lookup", dirLookupHandler) - http.HandleFunc("/dir/join", dirJoinHandler) - http.HandleFunc("/dir/status", dirStatusHandler) - - log.Println("Start directory service at http://127.0.0.1:" + strconv.Itoa(*port)) - e := http.ListenAndServe(":"+strconv.Itoa(*port), nil) - if e != nil { - log.Fatal("Fail to start:", e) - } - -} diff --git a/weed-fs/src/cmd/weedvolume/fix_volume_index/fix_volume_index.go b/weed-fs/src/cmd/weedvolume/fix_volume_index/fix_volume_index.go deleted file mode 100644 index 7abd2032f..000000000 --- a/weed-fs/src/cmd/weedvolume/fix_volume_index/fix_volume_index.go +++ /dev/null @@ -1,59 +0,0 @@ -package main - -import ( - "pkg/storage" - "flag" - "log" - "os" - "path" - "strconv" -) - -var ( - dir = flag.String("dir", "/tmp", "data directory to store files") - volumeId = flag.Int("volumeId", -1, "a non-negative volume id. The volume should already exist in the dir. The volume index file should not exist.") - IsDebug = flag.Bool("debug", false, "enable debug mode") - - store *storage.Store -) - -func main() { - flag.Parse() - - if *volumeId == -1 { - flag.Usage() - return - } - - fileName := strconv.Itoa(*volumeId) - dataFile, e := os.OpenFile(path.Join(*dir, fileName+".dat"), os.O_RDONLY, 0644) - if e != nil { - log.Fatalf("Read Volume [ERROR] %s\n", e) - } - defer dataFile.Close() - indexFile, ie := os.OpenFile(path.Join(*dir, fileName+".idx"), os.O_WRONLY|os.O_CREATE, 0644) - if ie != nil { - log.Fatalf("Create Volume Index [ERROR] %s\n", ie) - } - defer indexFile.Close() - - //skip the volume super block - dataFile.Seek(storage.SuperBlockSize, 0) - - n, length := storage.ReadNeedle(dataFile) - nm := storage.NewNeedleMap(indexFile) - offset := uint32(storage.SuperBlockSize) - for n != nil { - if *IsDebug { - log.Println("key", n.Key, "volume offset", offset, "data_size", n.Size, "length", length) - } - if n.Size > 0 { - count, pe := nm.Put(n.Key, offset/8, n.Size) - if *IsDebug { - log.Println("saved", count, "with error", pe) - } - } - offset += length - n, length = storage.ReadNeedle(dataFile) - } -} diff --git a/weed-fs/src/cmd/weedvolume/weedvolume.go b/weed-fs/src/cmd/weedvolume/weedvolume.go deleted file mode 100644 index e2ebfc53f..000000000 --- a/weed-fs/src/cmd/weedvolume/weedvolume.go +++ /dev/null @@ -1,182 +0,0 @@ -package main - -import ( - "encoding/json" - "flag" - "fmt" - "log" - "math/rand" - "mime" - "net/http" - "pkg/storage" - "strconv" - "strings" - "time" -) - -var ( - port = flag.Int("port", 8080, "http listen port") - chunkFolder = flag.String("dir", "/tmp", "data directory to store files") - volumes = flag.String("volumes", "0,1-3,4", "comma-separated list of volume ids or range of ids") - publicUrl = flag.String("publicUrl", "localhost:8080", "public url to serve data read") - metaServer = flag.String("mserver", "localhost:9333", "master directory server to store mappings") - IsDebug = flag.Bool("debug", false, "enable debug mode") - pulse = flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") - - store *storage.Store -) - -func statusHandler(w http.ResponseWriter, r *http.Request) { - writeJson(w, r, store.Status()) -} -func addVolumeHandler(w http.ResponseWriter, r *http.Request) { - store.AddVolume(r.FormValue("volume")) - writeJson(w, r, store.Status()) -} -func storeHandler(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case "GET": - GetHandler(w, r) - case "DELETE": - DeleteHandler(w, r) - case "POST": - PostHandler(w, r) - } -} -func GetHandler(w http.ResponseWriter, r *http.Request) { - n := new(storage.Needle) - vid, fid, ext := parseURLPath(r.URL.Path) - volumeId, _ := strconv.ParseUint(vid, 10, 64) - n.ParsePath(fid) - - if *IsDebug { - log.Println("volume", volumeId, "reading", n) - } - cookie := n.Cookie - count, e := store.Read(volumeId, n) - if *IsDebug { - log.Println("read bytes", count, "error", e) - } - if n.Cookie != cookie { - log.Println("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) - return - } - if ext != "" { - mtype := mime.TypeByExtension(ext) - w.Header().Set("Content-Type", mtype) - if storage.IsCompressable(ext, mtype){ - if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip"){ - w.Header().Set("Content-Encoding", "gzip") - }else{ - n.Data = storage.UnGzipData(n.Data) - } - } - } - w.Write(n.Data) -} -func PostHandler(w http.ResponseWriter, r *http.Request) { - vid, _, _ := parseURLPath(r.URL.Path) - volumeId, e := strconv.ParseUint(vid, 10, 64) - if e != nil { - writeJson(w, r, e) - } else { - needle, ne := storage.NewNeedle(r) - if ne != nil { - writeJson(w, r, ne) - } else { - ret := store.Write(volumeId, needle) - m := make(map[string]uint32) - m["size"] = ret - writeJson(w, r, m) - } - } -} -func DeleteHandler(w http.ResponseWriter, r *http.Request) { - n := new(storage.Needle) - vid, fid, _ := parseURLPath(r.URL.Path) - volumeId, _ := strconv.ParseUint(vid, 10, 64) - n.ParsePath(fid) - - if *IsDebug { - log.Println("deleting", n) - } - - cookie := n.Cookie - count, ok := store.Read(volumeId, n) - - if ok != nil { - m := make(map[string]uint32) - m["size"] = 0 - writeJson(w, r, m) - return - } - - if n.Cookie != cookie { - log.Println("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) - return - } - - n.Size = 0 - store.Delete(volumeId, n) - m := make(map[string]uint32) - m["size"] = uint32(count) - writeJson(w, r, m) -} -func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) { - w.Header().Set("Content-Type", "application/javascript") - bytes, _ := json.Marshal(obj) - callback := r.FormValue("callback") - if callback == "" { - w.Write(bytes) - } else { - w.Write([]uint8(callback)) - w.Write([]uint8("(")) - fmt.Fprint(w, string(bytes)) - w.Write([]uint8(")")) - } - //log.Println("JSON Response", string(bytes)) -} -func parseURLPath(path string) (vid, fid, ext string) { - sepIndex := strings.LastIndex(path, "/") - commaIndex := strings.LastIndex(path[sepIndex:], ",") - if commaIndex <= 0 { - if "favicon.ico" != path[sepIndex+1:] { - log.Println("unknown file id", path[sepIndex+1:]) - } - return - } - dotIndex := strings.LastIndex(path[sepIndex:], ".") - vid = path[sepIndex+1 : commaIndex] - fid = path[commaIndex+1:] - ext = "" - if dotIndex > 0 { - fid = path[commaIndex+1 : dotIndex] - ext = path[dotIndex:] - } - return -} - -func main() { - flag.Parse() - //TODO: now default to 1G, this value should come from server? - store = storage.NewStore(*port, *publicUrl, *chunkFolder, *volumes) - defer store.Close() - http.HandleFunc("/", storeHandler) - http.HandleFunc("/status", statusHandler) - http.HandleFunc("/add_volume", addVolumeHandler) - - go func() { - for { - store.Join(*metaServer) - time.Sleep(time.Duration(float32(*pulse*1e3)*(1+rand.Float32())) * time.Millisecond) - } - }() - log.Println("store joined at", *metaServer) - - log.Println("Start storage service at http://127.0.0.1:"+strconv.Itoa(*port), "public url", *publicUrl) - e := http.ListenAndServe(":"+strconv.Itoa(*port), nil) - if e != nil { - log.Fatalf("Fail to start:", e) - } - -}