You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

291 lines
9.7 KiB

13 years ago
13 years ago
13 years ago
13 years ago
13 years ago
13 years ago
13 years ago
13 years ago
13 years ago
13 years ago
13 years ago
12 years ago
12 years ago
13 years ago
13 years ago
  1. package main
  2. import (
  3. "bytes"
  4. "code.google.com/p/weed-fs/go/glog"
  5. "code.google.com/p/weed-fs/go/operation"
  6. "code.google.com/p/weed-fs/go/replication"
  7. "code.google.com/p/weed-fs/go/storage"
  8. "code.google.com/p/weed-fs/go/topology"
  9. "encoding/json"
  10. "errors"
  11. "net/http"
  12. "os"
  13. "runtime"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "time"
  18. )
  19. func init() {
  20. cmdMaster.Run = runMaster // break init cycle
  21. cmdMaster.IsDebug = cmdMaster.Flag.Bool("debug", false, "enable debug mode")
  22. }
  23. var cmdMaster = &Command{
  24. UsageLine: "master -port=9333",
  25. Short: "start a master server",
  26. Long: `start a master server to provide volume=>location mapping service
  27. and sequence number of file ids
  28. `,
  29. }
  30. var (
  31. mport = cmdMaster.Flag.Int("port", 9333, "http listen port")
  32. metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store mappings")
  33. volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes")
  34. mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
  35. confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
  36. defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "000", "Default replication type if not specified.")
  37. mReadTimeout = cmdMaster.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
  38. mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
  39. garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
  40. masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
  41. masterWhiteList []string
  42. )
  43. var topo *topology.Topology
  44. var vg *replication.VolumeGrowth
  45. var vgLock sync.Mutex
  46. func dirLookupHandler(w http.ResponseWriter, r *http.Request) {
  47. vid := r.FormValue("volumeId")
  48. commaSep := strings.Index(vid, ",")
  49. if commaSep > 0 {
  50. vid = vid[0:commaSep]
  51. }
  52. volumeId, err := storage.NewVolumeId(vid)
  53. if err == nil {
  54. machines := topo.Lookup(volumeId)
  55. if machines != nil {
  56. ret := []map[string]string{}
  57. for _, dn := range machines {
  58. ret = append(ret, map[string]string{"url": dn.Url(), "publicUrl": dn.PublicUrl})
  59. }
  60. writeJsonQuiet(w, r, map[string]interface{}{"locations": ret})
  61. } else {
  62. w.WriteHeader(http.StatusNotFound)
  63. writeJsonQuiet(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "})
  64. }
  65. } else {
  66. w.WriteHeader(http.StatusNotAcceptable)
  67. writeJsonQuiet(w, r, map[string]string{"error": "unknown volumeId format " + vid})
  68. }
  69. }
  70. func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
  71. c, e := strconv.Atoi(r.FormValue("count"))
  72. if e != nil {
  73. c = 1
  74. }
  75. repType := r.FormValue("replication")
  76. if repType == "" {
  77. repType = *defaultRepType
  78. }
  79. dataCenter := r.FormValue("dataCenter")
  80. rt, err := storage.NewReplicationTypeFromString(repType)
  81. if err != nil {
  82. w.WriteHeader(http.StatusNotAcceptable)
  83. writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
  84. return
  85. }
  86. if topo.GetVolumeLayout(rt).GetActiveVolumeCount(dataCenter) <= 0 {
  87. if topo.FreeSpace() <= 0 {
  88. w.WriteHeader(http.StatusNotFound)
  89. writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"})
  90. return
  91. } else {
  92. vgLock.Lock()
  93. defer vgLock.Unlock()
  94. if topo.GetVolumeLayout(rt).GetActiveVolumeCount(dataCenter) <= 0 {
  95. if _, err = vg.AutomaticGrowByType(rt, dataCenter, topo); err != nil {
  96. writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()})
  97. return
  98. }
  99. }
  100. }
  101. }
  102. fid, count, dn, err := topo.PickForWrite(rt, c, dataCenter)
  103. if err == nil {
  104. writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count})
  105. } else {
  106. w.WriteHeader(http.StatusNotAcceptable)
  107. writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
  108. }
  109. }
  110. func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
  111. init := r.FormValue("init") == "true"
  112. ip := r.FormValue("ip")
  113. if ip == "" {
  114. ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")]
  115. }
  116. port, _ := strconv.Atoi(r.FormValue("port"))
  117. maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount"))
  118. s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
  119. publicUrl := r.FormValue("publicUrl")
  120. volumes := new([]storage.VolumeInfo)
  121. if err := json.Unmarshal([]byte(r.FormValue("volumes")), volumes); err != nil {
  122. writeJsonQuiet(w, r, map[string]string{"error": "Cannot unmarshal \"volumes\": " + err.Error()})
  123. return
  124. }
  125. debug(s, "volumes", r.FormValue("volumes"))
  126. topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, r.FormValue("dataCenter"), r.FormValue("rack"))
  127. m := make(map[string]interface{})
  128. m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB) * 1024 * 1024
  129. writeJsonQuiet(w, r, m)
  130. }
  131. func dirStatusHandler(w http.ResponseWriter, r *http.Request) {
  132. m := make(map[string]interface{})
  133. m["Version"] = VERSION
  134. m["Topology"] = topo.ToMap()
  135. writeJsonQuiet(w, r, m)
  136. }
  137. func volumeVacuumHandler(w http.ResponseWriter, r *http.Request) {
  138. gcThreshold := r.FormValue("garbageThreshold")
  139. if gcThreshold == "" {
  140. gcThreshold = *garbageThreshold
  141. }
  142. debug("garbageThreshold =", gcThreshold)
  143. topo.Vacuum(gcThreshold)
  144. dirStatusHandler(w, r)
  145. }
  146. func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
  147. count := 0
  148. rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication"))
  149. if err == nil {
  150. if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
  151. if topo.FreeSpace() < count*rt.GetCopyCount() {
  152. err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount()))
  153. } else {
  154. count, err = vg.GrowByCountAndType(count, rt, r.FormValue("dataCneter"), topo)
  155. }
  156. } else {
  157. err = errors.New("parameter count is not found")
  158. }
  159. }
  160. if err != nil {
  161. w.WriteHeader(http.StatusNotAcceptable)
  162. writeJsonQuiet(w, r, map[string]string{"error": "parameter replication " + err.Error()})
  163. } else {
  164. w.WriteHeader(http.StatusNotAcceptable)
  165. writeJsonQuiet(w, r, map[string]interface{}{"count": count})
  166. }
  167. }
  168. func volumeStatusHandler(w http.ResponseWriter, r *http.Request) {
  169. m := make(map[string]interface{})
  170. m["Version"] = VERSION
  171. m["Volumes"] = topo.ToVolumeMap()
  172. writeJsonQuiet(w, r, m)
  173. }
  174. func redirectHandler(w http.ResponseWriter, r *http.Request) {
  175. vid, _, _, _, _ := parseURLPath(r.URL.Path)
  176. volumeId, err := storage.NewVolumeId(vid)
  177. if err != nil {
  178. debug("parsing error:", err, r.URL.Path)
  179. return
  180. }
  181. machines := topo.Lookup(volumeId)
  182. if machines != nil && len(machines) > 0 {
  183. http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
  184. } else {
  185. w.WriteHeader(http.StatusNotFound)
  186. writeJsonQuiet(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "})
  187. }
  188. }
  189. func submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
  190. submitForClientHandler(w, r, "localhost:"+strconv.Itoa(*mport))
  191. }
  192. func runMaster(cmd *Command, args []string) bool {
  193. if *mMaxCpu < 1 {
  194. *mMaxCpu = runtime.NumCPU()
  195. }
  196. runtime.GOMAXPROCS(*mMaxCpu)
  197. if *masterWhiteListOption != "" {
  198. masterWhiteList = strings.Split(*masterWhiteListOption, ",")
  199. }
  200. var e error
  201. if topo, e = topology.NewTopology("topo", *confFile, *metaFolder, "weed",
  202. uint64(*volumeSizeLimitMB)*1024*1024, *mpulse); e != nil {
  203. glog.Fatalf("cannot create topology:%s", e)
  204. }
  205. vg = replication.NewDefaultVolumeGrowth()
  206. glog.V(0).Infoln("Volume Size Limit is", *volumeSizeLimitMB, "MB")
  207. http.HandleFunc("/dir/assign", secure(masterWhiteList, dirAssignHandler))
  208. http.HandleFunc("/dir/lookup", secure(masterWhiteList, dirLookupHandler))
  209. http.HandleFunc("/dir/join", secure(masterWhiteList, dirJoinHandler))
  210. http.HandleFunc("/dir/status", secure(masterWhiteList, dirStatusHandler))
  211. http.HandleFunc("/vol/grow", secure(masterWhiteList, volumeGrowHandler))
  212. http.HandleFunc("/vol/status", secure(masterWhiteList, volumeStatusHandler))
  213. http.HandleFunc("/vol/vacuum", secure(masterWhiteList, volumeVacuumHandler))
  214. http.HandleFunc("/submit", secure(masterWhiteList, submitFromMasterServerHandler))
  215. http.HandleFunc("/", redirectHandler)
  216. topo.StartRefreshWritableVolumes(*garbageThreshold)
  217. glog.V(0).Infoln("Start Weed Master", VERSION, "at port", strconv.Itoa(*mport))
  218. srv := &http.Server{
  219. Addr: ":" + strconv.Itoa(*mport),
  220. Handler: http.DefaultServeMux,
  221. ReadTimeout: time.Duration(*mReadTimeout) * time.Second,
  222. }
  223. e = srv.ListenAndServe()
  224. if e != nil {
  225. glog.Fatalf("Fail to start:%s", e)
  226. }
  227. return true
  228. }
  229. func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) {
  230. m := make(map[string]interface{})
  231. if r.Method != "POST" {
  232. m["error"] = "Only submit via POST!"
  233. writeJsonQuiet(w, r, m)
  234. return
  235. }
  236. debug("parsing upload file...")
  237. fname, data, mimeType, isGzipped, lastModified, pe := storage.ParseUpload(r)
  238. if pe != nil {
  239. writeJsonError(w, r, pe)
  240. return
  241. }
  242. debug("assigning file id for", fname)
  243. assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"))
  244. if ae != nil {
  245. writeJsonError(w, r, ae)
  246. return
  247. }
  248. url := "http://" + assignResult.PublicUrl + "/" + assignResult.Fid
  249. if lastModified != 0 {
  250. url = url + "?ts=" + strconv.FormatUint(lastModified, 10)
  251. }
  252. debug("upload file to store", url)
  253. uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType)
  254. if err != nil {
  255. writeJsonError(w, r, err)
  256. return
  257. }
  258. m["fileName"] = fname
  259. m["fid"] = assignResult.Fid
  260. m["fileUrl"] = assignResult.PublicUrl + "/" + assignResult.Fid
  261. m["size"] = uploadResult.Size
  262. writeJsonQuiet(w, r, m)
  263. return
  264. }