You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

217 lines
7.1 KiB

13 years ago
13 years ago
13 years ago
13 years ago
13 years ago
13 years ago
13 years ago
13 years ago
13 years ago
13 years ago
13 years ago
13 years ago
12 years ago
12 years ago
13 years ago
13 years ago
13 years ago
  1. package main
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "log"
  6. "net/http"
  7. "code.google.com/p/weed-fs/go/replication"
  8. "code.google.com/p/weed-fs/go/storage"
  9. "code.google.com/p/weed-fs/go/topology"
  10. "runtime"
  11. "strconv"
  12. "strings"
  13. "time"
  14. )
  15. func init() {
  16. cmdMaster.Run = runMaster // break init cycle
  17. cmdMaster.IsDebug = cmdMaster.Flag.Bool("debug", false, "enable debug mode")
  18. }
  19. var cmdMaster = &Command{
  20. UsageLine: "master -port=9333",
  21. Short: "start a master server",
  22. Long: `start a master server to provide volume=>location mapping service
  23. and sequence number of file ids
  24. `,
  25. }
  26. var (
  27. mport = cmdMaster.Flag.Int("port", 9333, "http listen port")
  28. metaFolder = cmdMaster.Flag.String("mdir", "/tmp", "data directory to store mappings")
  29. volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes")
  30. mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
  31. confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
  32. defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "000", "Default replication type if not specified.")
  33. mReadTimeout = cmdMaster.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
  34. mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
  35. garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
  36. )
  37. var topo *topology.Topology
  38. var vg *replication.VolumeGrowth
  39. func dirLookupHandler(w http.ResponseWriter, r *http.Request) {
  40. vid := r.FormValue("volumeId")
  41. commaSep := strings.Index(vid, ",")
  42. if commaSep > 0 {
  43. vid = vid[0:commaSep]
  44. }
  45. volumeId, err := storage.NewVolumeId(vid)
  46. if err == nil {
  47. machines := topo.Lookup(volumeId)
  48. if machines != nil {
  49. ret := []map[string]string{}
  50. for _, dn := range machines {
  51. ret = append(ret, map[string]string{"url": dn.Url(), "publicUrl": dn.PublicUrl})
  52. }
  53. writeJson(w, r, map[string]interface{}{"locations": ret})
  54. } else {
  55. w.WriteHeader(http.StatusNotFound)
  56. writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "})
  57. }
  58. } else {
  59. w.WriteHeader(http.StatusNotAcceptable)
  60. writeJson(w, r, map[string]string{"error": "unknown volumeId format " + vid})
  61. }
  62. }
  63. func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
  64. c, e := strconv.Atoi(r.FormValue("count"))
  65. if e != nil {
  66. c = 1
  67. }
  68. repType := r.FormValue("replication")
  69. if repType == "" {
  70. repType = *defaultRepType
  71. }
  72. rt, err := storage.NewReplicationTypeFromString(repType)
  73. if err != nil {
  74. w.WriteHeader(http.StatusNotAcceptable)
  75. writeJson(w, r, map[string]string{"error": err.Error()})
  76. return
  77. }
  78. if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 {
  79. if topo.FreeSpace() <= 0 {
  80. w.WriteHeader(http.StatusNotFound)
  81. writeJson(w, r, map[string]string{"error": "No free volumes left!"})
  82. return
  83. } else {
  84. vg.GrowByType(rt, topo)
  85. }
  86. }
  87. fid, count, dn, err := topo.PickForWrite(rt, c)
  88. if err == nil {
  89. writeJson(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count})
  90. } else {
  91. w.WriteHeader(http.StatusNotAcceptable)
  92. writeJson(w, r, map[string]string{"error": err.Error()})
  93. }
  94. }
  95. func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
  96. init := r.FormValue("init") == "true"
  97. ip := r.FormValue("ip")
  98. if ip == "" {
  99. ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")]
  100. }
  101. port, _ := strconv.Atoi(r.FormValue("port"))
  102. maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount"))
  103. s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
  104. publicUrl := r.FormValue("publicUrl")
  105. volumes := new([]storage.VolumeInfo)
  106. json.Unmarshal([]byte(r.FormValue("volumes")), volumes)
  107. debug(s, "volumes", r.FormValue("volumes"))
  108. topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount)
  109. m := make(map[string]interface{})
  110. m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB) * 1024 * 1024
  111. writeJson(w, r, m)
  112. }
  113. func dirStatusHandler(w http.ResponseWriter, r *http.Request) {
  114. m := make(map[string]interface{})
  115. m["Version"] = VERSION
  116. m["Topology"] = topo.ToMap()
  117. writeJson(w, r, m)
  118. }
  119. func volumeVacuumHandler(w http.ResponseWriter, r *http.Request) {
  120. gcThreshold := r.FormValue("garbageThreshold")
  121. if gcThreshold == "" {
  122. gcThreshold = *garbageThreshold
  123. }
  124. debug("garbageThreshold =", gcThreshold)
  125. topo.Vacuum(gcThreshold)
  126. dirStatusHandler(w, r)
  127. }
  128. func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
  129. count := 0
  130. rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication"))
  131. if err == nil {
  132. if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
  133. if topo.FreeSpace() < count*rt.GetCopyCount() {
  134. err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount()))
  135. } else {
  136. count, err = vg.GrowByCountAndType(count, rt, topo)
  137. }
  138. } else {
  139. err = errors.New("parameter count is not found")
  140. }
  141. }
  142. if err != nil {
  143. w.WriteHeader(http.StatusNotAcceptable)
  144. writeJson(w, r, map[string]string{"error": "parameter replication " + err.Error()})
  145. } else {
  146. w.WriteHeader(http.StatusNotAcceptable)
  147. writeJson(w, r, map[string]interface{}{"count": count})
  148. }
  149. }
  150. func volumeStatusHandler(w http.ResponseWriter, r *http.Request) {
  151. m := make(map[string]interface{})
  152. m["Version"] = VERSION
  153. m["Volumes"] = topo.ToVolumeMap()
  154. writeJson(w, r, m)
  155. }
  156. func redirectHandler(w http.ResponseWriter, r *http.Request) {
  157. vid, _, _ := parseURLPath(r.URL.Path)
  158. volumeId, err := storage.NewVolumeId(vid)
  159. if err != nil {
  160. debug("parsing error:", err, r.URL.Path)
  161. return
  162. }
  163. machines := topo.Lookup(volumeId)
  164. if machines != nil && len(machines) > 0 {
  165. http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
  166. } else {
  167. w.WriteHeader(http.StatusNotFound)
  168. writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "})
  169. }
  170. }
  171. func runMaster(cmd *Command, args []string) bool {
  172. if *mMaxCpu < 1 {
  173. *mMaxCpu = runtime.NumCPU()
  174. }
  175. runtime.GOMAXPROCS(*mMaxCpu)
  176. topo = topology.NewTopology("topo", *confFile, *metaFolder, "weed", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse)
  177. vg = replication.NewDefaultVolumeGrowth()
  178. log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB")
  179. http.HandleFunc("/dir/assign", dirAssignHandler)
  180. http.HandleFunc("/dir/lookup", dirLookupHandler)
  181. http.HandleFunc("/dir/join", dirJoinHandler)
  182. http.HandleFunc("/dir/status", dirStatusHandler)
  183. http.HandleFunc("/vol/grow", volumeGrowHandler)
  184. http.HandleFunc("/vol/status", volumeStatusHandler)
  185. http.HandleFunc("/vol/vacuum", volumeVacuumHandler)
  186. http.HandleFunc("/", redirectHandler)
  187. topo.StartRefreshWritableVolumes(*garbageThreshold)
  188. log.Println("Start Weed Master", VERSION, "at port", strconv.Itoa(*mport))
  189. srv := &http.Server{
  190. Addr: ":" + strconv.Itoa(*mport),
  191. Handler: http.DefaultServeMux,
  192. ReadTimeout: time.Duration(*mReadTimeout) * time.Second,
  193. }
  194. e := srv.ListenAndServe()
  195. if e != nil {
  196. log.Fatalf("Fail to start:%s", e.Error())
  197. }
  198. return true
  199. }