You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

388 lines
12 KiB

13 years ago
13 years ago
13 years ago
13 years ago
13 years ago
12 years ago
  1. package main
  2. import (
  3. "code.google.com/p/weed-fs/go/glog"
  4. "code.google.com/p/weed-fs/go/operation"
  5. "code.google.com/p/weed-fs/go/replication"
  6. "code.google.com/p/weed-fs/go/storage"
  7. "github.com/gorilla/mux"
  8. "math/rand"
  9. "mime"
  10. "net/http"
  11. "os"
  12. "path/filepath"
  13. "runtime"
  14. "strconv"
  15. "strings"
  16. "time"
  17. )
  18. func init() {
  19. cmdVolume.Run = runVolume // break init cycle
  20. cmdVolume.IsDebug = cmdVolume.Flag.Bool("debug", false, "enable debug mode")
  21. }
  22. var cmdVolume = &Command{
  23. UsageLine: "volume -port=8080 -dir=/tmp -max=5 -ip=server_name -mserver=localhost:9333",
  24. Short: "start a volume server",
  25. Long: `start a volume server to provide storage spaces
  26. `,
  27. }
  28. var (
  29. vport = cmdVolume.Flag.Int("port", 8080, "http listen port")
  30. volumeFolders = cmdVolume.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
  31. maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...")
  32. ip = cmdVolume.Flag.String("ip", "localhost", "ip or server name")
  33. publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible <ip|server_name>:<port>")
  34. masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location")
  35. vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting")
  36. vReadTimeout = cmdVolume.Flag.Int("readTimeout", 3, "connection read timeout in seconds. Increase this if uploading large files.")
  37. vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
  38. dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
  39. rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
  40. volumeWhiteListOption = cmdVolume.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
  41. store *storage.Store
  42. volumeWhiteList []string
  43. )
  44. var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
  45. func statusHandler(w http.ResponseWriter, r *http.Request) {
  46. m := make(map[string]interface{})
  47. m["Version"] = VERSION
  48. m["Volumes"] = store.Status()
  49. writeJsonQuiet(w, r, m)
  50. }
  51. func assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
  52. err := store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replicationType"))
  53. if err == nil {
  54. writeJsonQuiet(w, r, map[string]string{"error": ""})
  55. } else {
  56. writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
  57. }
  58. debug("assign volume =", r.FormValue("volume"), ", collection =", r.FormValue("collection"), ", replicationType =", r.FormValue("replicationType"), ", error =", err)
  59. }
  60. func vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) {
  61. err, ret := store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold"))
  62. if err == nil {
  63. writeJsonQuiet(w, r, map[string]interface{}{"error": "", "result": ret})
  64. } else {
  65. writeJsonQuiet(w, r, map[string]interface{}{"error": err.Error(), "result": false})
  66. }
  67. debug("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret)
  68. }
  69. func vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) {
  70. err := store.CompactVolume(r.FormValue("volume"))
  71. if err == nil {
  72. writeJsonQuiet(w, r, map[string]string{"error": ""})
  73. } else {
  74. writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
  75. }
  76. debug("compacted volume =", r.FormValue("volume"), ", error =", err)
  77. }
  78. func vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) {
  79. err := store.CommitCompactVolume(r.FormValue("volume"))
  80. if err == nil {
  81. writeJsonQuiet(w, r, map[string]interface{}{"error": ""})
  82. } else {
  83. writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
  84. }
  85. debug("commit compact volume =", r.FormValue("volume"), ", error =", err)
  86. }
  87. func freezeVolumeHandler(w http.ResponseWriter, r *http.Request) {
  88. //TODO: notify master that this volume will be read-only
  89. err := store.FreezeVolume(r.FormValue("volume"))
  90. if err == nil {
  91. writeJsonQuiet(w, r, map[string]interface{}{"error": ""})
  92. } else {
  93. writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
  94. }
  95. debug("freeze volume =", r.FormValue("volume"), ", error =", err)
  96. }
  97. func submitFromVolumeServerHandler(w http.ResponseWriter, r *http.Request) {
  98. submitForClientHandler(w, r, *masterNode)
  99. }
  100. func storeHandler(w http.ResponseWriter, r *http.Request) {
  101. switch r.Method {
  102. case "GET":
  103. GetOrHeadHandler(w, r, true)
  104. case "HEAD":
  105. GetOrHeadHandler(w, r, false)
  106. case "DELETE":
  107. secure(volumeWhiteList, DeleteHandler)(w, r)
  108. case "PUT":
  109. secure(volumeWhiteList, PostHandler)(w, r)
  110. case "POST":
  111. secure(volumeWhiteList, PostHandler)(w, r)
  112. }
  113. }
  114. func GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) {
  115. n := new(storage.Needle)
  116. vid, fid, filename, ext, _ := parseURLPath(r.URL.Path)
  117. volumeId, err := storage.NewVolumeId(vid)
  118. if err != nil {
  119. debug("parsing error:", err, r.URL.Path)
  120. return
  121. }
  122. n.ParsePath(fid)
  123. debug("volume", volumeId, "reading", n)
  124. if !store.HasVolume(volumeId) {
  125. lookupResult, err := operation.Lookup(*masterNode, volumeId)
  126. debug("volume", volumeId, "found on", lookupResult, "error", err)
  127. if err == nil {
  128. http.Redirect(w, r, "http://"+lookupResult.Locations[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
  129. } else {
  130. debug("lookup error:", err, r.URL.Path)
  131. w.WriteHeader(http.StatusNotFound)
  132. }
  133. return
  134. }
  135. cookie := n.Cookie
  136. count, e := store.Read(volumeId, n)
  137. debug("read bytes", count, "error", e)
  138. if e != nil || count <= 0 {
  139. debug("read error:", e, r.URL.Path)
  140. w.WriteHeader(http.StatusNotFound)
  141. return
  142. }
  143. if n.Cookie != cookie {
  144. glog.V(0).Infoln("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
  145. w.WriteHeader(http.StatusNotFound)
  146. return
  147. }
  148. if n.LastModified != 0 {
  149. w.Header().Set("Last-Modified", time.Unix(int64(n.LastModified), 0).UTC().Format(http.TimeFormat))
  150. if r.Header.Get("If-Modified-Since") != "" {
  151. if t, parseError := time.Parse(http.TimeFormat, r.Header.Get("If-Modified-Since")); parseError == nil {
  152. if t.Unix() >= int64(n.LastModified) {
  153. w.WriteHeader(http.StatusNotModified)
  154. return
  155. }
  156. }
  157. }
  158. }
  159. if n.NameSize > 0 && filename == "" {
  160. filename = string(n.Name)
  161. dotIndex := strings.LastIndex(filename, ".")
  162. if dotIndex > 0 {
  163. ext = filename[dotIndex:]
  164. }
  165. }
  166. mtype := ""
  167. if ext != "" {
  168. mtype = mime.TypeByExtension(ext)
  169. }
  170. if n.MimeSize > 0 {
  171. mtype = string(n.Mime)
  172. }
  173. if mtype != "" {
  174. w.Header().Set("Content-Type", mtype)
  175. }
  176. if filename != "" {
  177. w.Header().Set("Content-Disposition", "filename="+fileNameEscaper.Replace(filename))
  178. }
  179. if ext != ".gz" {
  180. if n.IsGzipped() {
  181. if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
  182. w.Header().Set("Content-Encoding", "gzip")
  183. } else {
  184. if n.Data, err = storage.UnGzipData(n.Data); err != nil {
  185. debug("lookup error:", err, r.URL.Path)
  186. }
  187. }
  188. }
  189. }
  190. w.Header().Set("Content-Length", strconv.Itoa(len(n.Data)))
  191. if isGetMethod {
  192. if _, e = w.Write(n.Data); e != nil {
  193. debug("response write error:", e)
  194. }
  195. }
  196. }
  197. func PostHandler(w http.ResponseWriter, r *http.Request) {
  198. m := make(map[string]interface{})
  199. if e := r.ParseForm(); e != nil {
  200. debug("form parse error:", e)
  201. writeJsonError(w, r, e)
  202. return
  203. }
  204. vid, _, _, _, _ := parseURLPath(r.URL.Path)
  205. volumeId, ve := storage.NewVolumeId(vid)
  206. if ve != nil {
  207. debug("NewVolumeId error:", ve)
  208. writeJsonError(w, r, ve)
  209. return
  210. }
  211. needle, ne := storage.NewNeedle(r)
  212. if ne != nil {
  213. writeJsonError(w, r, ne)
  214. return
  215. }
  216. ret, errorStatus := replication.ReplicatedWrite(*masterNode, store, volumeId, needle, r)
  217. if errorStatus == "" {
  218. w.WriteHeader(http.StatusCreated)
  219. } else {
  220. w.WriteHeader(http.StatusInternalServerError)
  221. m["error"] = errorStatus
  222. }
  223. m["size"] = ret
  224. writeJsonQuiet(w, r, m)
  225. }
  226. func DeleteHandler(w http.ResponseWriter, r *http.Request) {
  227. n := new(storage.Needle)
  228. vid, fid, _, _, _ := parseURLPath(r.URL.Path)
  229. volumeId, _ := storage.NewVolumeId(vid)
  230. n.ParsePath(fid)
  231. debug("deleting", n)
  232. cookie := n.Cookie
  233. count, ok := store.Read(volumeId, n)
  234. if ok != nil {
  235. m := make(map[string]uint32)
  236. m["size"] = 0
  237. writeJsonQuiet(w, r, m)
  238. return
  239. }
  240. if n.Cookie != cookie {
  241. glog.V(0).Infoln("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
  242. return
  243. }
  244. n.Size = 0
  245. ret := replication.ReplicatedDelete(*masterNode, store, volumeId, n, r)
  246. if ret != 0 {
  247. w.WriteHeader(http.StatusAccepted)
  248. } else {
  249. w.WriteHeader(http.StatusInternalServerError)
  250. }
  251. m := make(map[string]uint32)
  252. m["size"] = uint32(count)
  253. writeJsonQuiet(w, r, m)
  254. }
  255. func parseURLPath(path string) (vid, fid, filename, ext string, isVolumeIdOnly bool) {
  256. switch strings.Count(path, "/") {
  257. case 3:
  258. parts := strings.Split(path, "/")
  259. vid, fid, filename = parts[1], parts[2], parts[3]
  260. ext = filepath.Ext(filename)
  261. case 2:
  262. parts := strings.Split(path, "/")
  263. vid, fid = parts[1], parts[2]
  264. dotIndex := strings.LastIndex(fid, ".")
  265. if dotIndex > 0 {
  266. ext = fid[dotIndex:]
  267. fid = fid[0:dotIndex]
  268. }
  269. default:
  270. sepIndex := strings.LastIndex(path, "/")
  271. commaIndex := strings.LastIndex(path[sepIndex:], ",")
  272. if commaIndex <= 0 {
  273. vid, isVolumeIdOnly = path[sepIndex+1:], true
  274. return
  275. }
  276. dotIndex := strings.LastIndex(path[sepIndex:], ".")
  277. vid = path[sepIndex+1 : commaIndex]
  278. fid = path[commaIndex+1:]
  279. ext = ""
  280. if dotIndex > 0 {
  281. fid = path[commaIndex+1 : dotIndex]
  282. ext = path[dotIndex:]
  283. }
  284. }
  285. return
  286. }
  287. func runVolume(cmd *Command, args []string) bool {
  288. if *vMaxCpu < 1 {
  289. *vMaxCpu = runtime.NumCPU()
  290. }
  291. runtime.GOMAXPROCS(*vMaxCpu)
  292. folders := strings.Split(*volumeFolders, ",")
  293. maxCountStrings := strings.Split(*maxVolumeCounts, ",")
  294. maxCounts := make([]int, 0)
  295. for _, maxString := range maxCountStrings {
  296. if max, e := strconv.Atoi(maxString); e == nil {
  297. maxCounts = append(maxCounts, max)
  298. } else {
  299. glog.Fatalf("The max specified in -max not a valid number %s", max)
  300. }
  301. }
  302. if len(folders) != len(maxCounts) {
  303. glog.Fatalf("%d directories by -dir, but only %d max is set by -max", len(folders), len(maxCounts))
  304. }
  305. for _, folder := range folders {
  306. fileInfo, err := os.Stat(folder)
  307. if err != nil {
  308. glog.Fatalf("No Existing Folder:%s", folder)
  309. }
  310. if !fileInfo.IsDir() {
  311. glog.Fatalf("Volume Folder should not be a file:%s", folder)
  312. }
  313. perm := fileInfo.Mode().Perm()
  314. glog.V(0).Infoln("Volume Folder", folder)
  315. glog.V(0).Infoln("Permission:", perm)
  316. }
  317. if *publicUrl == "" {
  318. *publicUrl = *ip + ":" + strconv.Itoa(*vport)
  319. }
  320. if *volumeWhiteListOption != "" {
  321. volumeWhiteList = strings.Split(*volumeWhiteListOption, ",")
  322. }
  323. store = storage.NewStore(*vport, *ip, *publicUrl, folders, maxCounts)
  324. defer store.Close()
  325. r := mux.NewRouter()
  326. r.HandleFunc("/submit", secure(volumeWhiteList, submitFromVolumeServerHandler))
  327. r.HandleFunc("/status", secure(volumeWhiteList, statusHandler))
  328. r.HandleFunc("/admin/assign_volume", secure(volumeWhiteList, assignVolumeHandler))
  329. r.HandleFunc("/admin/vacuum_volume_check", secure(volumeWhiteList, vacuumVolumeCheckHandler))
  330. r.HandleFunc("/admin/vacuum_volume_compact", secure(volumeWhiteList, vacuumVolumeCompactHandler))
  331. r.HandleFunc("/admin/vacuum_volume_commit", secure(volumeWhiteList, vacuumVolumeCommitHandler))
  332. r.HandleFunc("/admin/freeze_volume", secure(volumeWhiteList, freezeVolumeHandler))
  333. r.HandleFunc("/", storeHandler)
  334. go func() {
  335. connected := true
  336. store.SetMaster(*masterNode)
  337. store.SetDataCenter(*dataCenter)
  338. store.SetRack(*rack)
  339. for {
  340. err := store.Join()
  341. if err == nil {
  342. if !connected {
  343. connected = true
  344. glog.V(0).Infoln("Reconnected with master")
  345. }
  346. } else {
  347. if connected {
  348. connected = false
  349. }
  350. }
  351. time.Sleep(time.Duration(float32(*vpulse*1e3)*(1+rand.Float32())) * time.Millisecond)
  352. }
  353. }()
  354. glog.V(0).Infoln("store joined at", *masterNode)
  355. glog.V(0).Infoln("Start Weed volume server", VERSION, "at http://"+*ip+":"+strconv.Itoa(*vport))
  356. srv := &http.Server{
  357. Addr: ":" + strconv.Itoa(*vport),
  358. Handler: r,
  359. ReadTimeout: (time.Duration(*vReadTimeout) * time.Second),
  360. }
  361. e := srv.ListenAndServe()
  362. if e != nil {
  363. glog.Fatalf("Fail to start:%s", e.Error())
  364. }
  365. return true
  366. }