You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

302 lines
10 KiB

12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
  1. package main
  2. import (
  3. "bytes"
  4. "code.google.com/p/weed-fs/go/glog"
  5. "code.google.com/p/weed-fs/go/operation"
  6. "code.google.com/p/weed-fs/go/replication"
  7. "code.google.com/p/weed-fs/go/sequence"
  8. "code.google.com/p/weed-fs/go/storage"
  9. "code.google.com/p/weed-fs/go/topology"
  10. "encoding/json"
  11. "errors"
  12. "net/http"
  13. "os"
  14. "path"
  15. "runtime"
  16. "strconv"
  17. "strings"
  18. "sync"
  19. "time"
  20. )
  21. func init() {
  22. cmdMaster.Run = runMaster // break init cycle
  23. cmdMaster.IsDebug = cmdMaster.Flag.Bool("debug", false, "enable debug mode")
  24. }
  25. var cmdMaster = &Command{
  26. UsageLine: "master -port=9333",
  27. Short: "start a master server",
  28. Long: `start a master server to provide volume=>location mapping service
  29. and sequence number of file ids
  30. `,
  31. }
  32. var (
  33. mport = cmdMaster.Flag.Int("port", 9333, "http listen port")
  34. metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store mappings")
  35. volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes")
  36. mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
  37. confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
  38. defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "000", "Default replication type if not specified.")
  39. mReadTimeout = cmdMaster.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
  40. mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
  41. garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
  42. masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
  43. etcdCluster = cmdMaster.Flag.String("etcd", "", "comma separated etcd urls, e.g., http://localhost:4001, See github.com/coreos/go-etcd/etcd")
  44. masterWhiteList []string
  45. )
  46. var topo *topology.Topology
  47. var vg *replication.VolumeGrowth
  48. var vgLock sync.Mutex
  49. func dirLookupHandler(w http.ResponseWriter, r *http.Request) {
  50. vid := r.FormValue("volumeId")
  51. collection := r.FormValue("collection") //optional, but can be faster if too many collections
  52. commaSep := strings.Index(vid, ",")
  53. if commaSep > 0 {
  54. vid = vid[0:commaSep]
  55. }
  56. volumeId, err := storage.NewVolumeId(vid)
  57. if err == nil {
  58. machines := topo.Lookup(collection, volumeId)
  59. if machines != nil {
  60. ret := []map[string]string{}
  61. for _, dn := range machines {
  62. ret = append(ret, map[string]string{"url": dn.Url(), "publicUrl": dn.PublicUrl})
  63. }
  64. writeJsonQuiet(w, r, map[string]interface{}{"locations": ret})
  65. } else {
  66. w.WriteHeader(http.StatusNotFound)
  67. writeJsonQuiet(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "})
  68. }
  69. } else {
  70. w.WriteHeader(http.StatusNotAcceptable)
  71. writeJsonQuiet(w, r, map[string]string{"error": "unknown volumeId format " + vid})
  72. }
  73. }
  74. func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
  75. c, e := strconv.Atoi(r.FormValue("count"))
  76. if e != nil {
  77. c = 1
  78. }
  79. repType := r.FormValue("replication")
  80. if repType == "" {
  81. repType = *defaultRepType
  82. }
  83. collection := r.FormValue("collection")
  84. dataCenter := r.FormValue("dataCenter")
  85. rt, err := storage.NewReplicationTypeFromString(repType)
  86. if err != nil {
  87. w.WriteHeader(http.StatusNotAcceptable)
  88. writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
  89. return
  90. }
  91. if topo.GetVolumeLayout(collection, rt).GetActiveVolumeCount(dataCenter) <= 0 {
  92. if topo.FreeSpace() <= 0 {
  93. w.WriteHeader(http.StatusNotFound)
  94. writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"})
  95. return
  96. } else {
  97. vgLock.Lock()
  98. defer vgLock.Unlock()
  99. if topo.GetVolumeLayout(collection, rt).GetActiveVolumeCount(dataCenter) <= 0 {
  100. if _, err = vg.AutomaticGrowByType(collection, rt, dataCenter, topo); err != nil {
  101. writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()})
  102. return
  103. }
  104. }
  105. }
  106. }
  107. fid, count, dn, err := topo.PickForWrite(collection, rt, c, dataCenter)
  108. if err == nil {
  109. writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count})
  110. } else {
  111. w.WriteHeader(http.StatusNotAcceptable)
  112. writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
  113. }
  114. }
  115. func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
  116. init := r.FormValue("init") == "true"
  117. ip := r.FormValue("ip")
  118. if ip == "" {
  119. ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")]
  120. }
  121. port, _ := strconv.Atoi(r.FormValue("port"))
  122. maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount"))
  123. s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
  124. publicUrl := r.FormValue("publicUrl")
  125. volumes := new([]storage.VolumeInfo)
  126. if err := json.Unmarshal([]byte(r.FormValue("volumes")), volumes); err != nil {
  127. writeJsonQuiet(w, r, map[string]string{"error": "Cannot unmarshal \"volumes\": " + err.Error()})
  128. return
  129. }
  130. debug(s, "volumes", r.FormValue("volumes"))
  131. topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, r.FormValue("dataCenter"), r.FormValue("rack"))
  132. m := make(map[string]interface{})
  133. m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB) * 1024 * 1024
  134. writeJsonQuiet(w, r, m)
  135. }
  136. func dirStatusHandler(w http.ResponseWriter, r *http.Request) {
  137. m := make(map[string]interface{})
  138. m["Version"] = VERSION
  139. m["Topology"] = topo.ToMap()
  140. writeJsonQuiet(w, r, m)
  141. }
  142. func volumeVacuumHandler(w http.ResponseWriter, r *http.Request) {
  143. gcThreshold := r.FormValue("garbageThreshold")
  144. if gcThreshold == "" {
  145. gcThreshold = *garbageThreshold
  146. }
  147. debug("garbageThreshold =", gcThreshold)
  148. topo.Vacuum(gcThreshold)
  149. dirStatusHandler(w, r)
  150. }
  151. func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
  152. count := 0
  153. rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication"))
  154. if err == nil {
  155. if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
  156. if topo.FreeSpace() < count*rt.GetCopyCount() {
  157. err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount()))
  158. } else {
  159. count, err = vg.GrowByCountAndType(count, r.FormValue("collection"), rt, r.FormValue("dataCneter"), topo)
  160. }
  161. } else {
  162. err = errors.New("parameter count is not found")
  163. }
  164. }
  165. if err != nil {
  166. w.WriteHeader(http.StatusNotAcceptable)
  167. writeJsonQuiet(w, r, map[string]string{"error": "parameter replication " + err.Error()})
  168. } else {
  169. w.WriteHeader(http.StatusNotAcceptable)
  170. writeJsonQuiet(w, r, map[string]interface{}{"count": count})
  171. }
  172. }
  173. func volumeStatusHandler(w http.ResponseWriter, r *http.Request) {
  174. m := make(map[string]interface{})
  175. m["Version"] = VERSION
  176. m["Volumes"] = topo.ToVolumeMap()
  177. writeJsonQuiet(w, r, m)
  178. }
  179. func redirectHandler(w http.ResponseWriter, r *http.Request) {
  180. vid, _, _, _, _ := parseURLPath(r.URL.Path)
  181. volumeId, err := storage.NewVolumeId(vid)
  182. if err != nil {
  183. debug("parsing error:", err, r.URL.Path)
  184. return
  185. }
  186. machines := topo.Lookup("", volumeId)
  187. if machines != nil && len(machines) > 0 {
  188. http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
  189. } else {
  190. w.WriteHeader(http.StatusNotFound)
  191. writeJsonQuiet(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "})
  192. }
  193. }
  194. func submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
  195. submitForClientHandler(w, r, "localhost:"+strconv.Itoa(*mport))
  196. }
  197. func runMaster(cmd *Command, args []string) bool {
  198. if *mMaxCpu < 1 {
  199. *mMaxCpu = runtime.NumCPU()
  200. }
  201. runtime.GOMAXPROCS(*mMaxCpu)
  202. if *masterWhiteListOption != "" {
  203. masterWhiteList = strings.Split(*masterWhiteListOption, ",")
  204. }
  205. var seq sequence.Sequencer
  206. if len(*etcdCluster) == 0 {
  207. seq = sequence.NewFileSequencer(path.Join(*metaFolder, "weed.seq"))
  208. } else {
  209. seq = sequence.NewEtcdSequencer(*etcdCluster)
  210. }
  211. var e error
  212. if topo, e = topology.NewTopology("topo", *confFile, seq,
  213. uint64(*volumeSizeLimitMB)*1024*1024, *mpulse); e != nil {
  214. glog.Fatalf("cannot create topology:%s", e)
  215. }
  216. vg = replication.NewDefaultVolumeGrowth()
  217. glog.V(0).Infoln("Volume Size Limit is", *volumeSizeLimitMB, "MB")
  218. http.HandleFunc("/dir/assign", secure(masterWhiteList, dirAssignHandler))
  219. http.HandleFunc("/dir/lookup", secure(masterWhiteList, dirLookupHandler))
  220. http.HandleFunc("/dir/join", secure(masterWhiteList, dirJoinHandler))
  221. http.HandleFunc("/dir/status", secure(masterWhiteList, dirStatusHandler))
  222. http.HandleFunc("/vol/grow", secure(masterWhiteList, volumeGrowHandler))
  223. http.HandleFunc("/vol/status", secure(masterWhiteList, volumeStatusHandler))
  224. http.HandleFunc("/vol/vacuum", secure(masterWhiteList, volumeVacuumHandler))
  225. http.HandleFunc("/submit", secure(masterWhiteList, submitFromMasterServerHandler))
  226. http.HandleFunc("/", redirectHandler)
  227. topo.StartRefreshWritableVolumes(*garbageThreshold)
  228. glog.V(0).Infoln("Start Weed Master", VERSION, "at port", strconv.Itoa(*mport))
  229. srv := &http.Server{
  230. Addr: ":" + strconv.Itoa(*mport),
  231. Handler: http.DefaultServeMux,
  232. ReadTimeout: time.Duration(*mReadTimeout) * time.Second,
  233. }
  234. e = srv.ListenAndServe()
  235. if e != nil {
  236. glog.Fatalf("Fail to start:%s", e)
  237. }
  238. return true
  239. }
  240. func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) {
  241. m := make(map[string]interface{})
  242. if r.Method != "POST" {
  243. m["error"] = "Only submit via POST!"
  244. writeJsonQuiet(w, r, m)
  245. return
  246. }
  247. debug("parsing upload file...")
  248. fname, data, mimeType, isGzipped, lastModified, pe := storage.ParseUpload(r)
  249. if pe != nil {
  250. writeJsonError(w, r, pe)
  251. return
  252. }
  253. debug("assigning file id for", fname)
  254. assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"))
  255. if ae != nil {
  256. writeJsonError(w, r, ae)
  257. return
  258. }
  259. url := "http://" + assignResult.PublicUrl + "/" + assignResult.Fid
  260. if lastModified != 0 {
  261. url = url + "?ts=" + strconv.FormatUint(lastModified, 10)
  262. }
  263. debug("upload file to store", url)
  264. uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType)
  265. if err != nil {
  266. writeJsonError(w, r, err)
  267. return
  268. }
  269. m["fileName"] = fname
  270. m["fid"] = assignResult.Fid
  271. m["fileUrl"] = assignResult.PublicUrl + "/" + assignResult.Fid
  272. m["size"] = uploadResult.Size
  273. writeJsonQuiet(w, r, m)
  274. return
  275. }