You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

355 lines
11 KiB

13 years ago
13 years ago
13 years ago
13 years ago
13 years ago
13 years ago
  1. package main
  2. import (
  3. "code.google.com/p/weed-fs/go/operation"
  4. "code.google.com/p/weed-fs/go/replication"
  5. "code.google.com/p/weed-fs/go/storage"
  6. "log"
  7. "math/rand"
  8. "mime"
  9. "net/http"
  10. "os"
  11. "runtime"
  12. "strconv"
  13. "strings"
  14. "time"
  15. )
  16. func init() {
  17. cmdVolume.Run = runVolume // break init cycle
  18. cmdVolume.IsDebug = cmdVolume.Flag.Bool("debug", false, "enable debug mode")
  19. }
  20. var cmdVolume = &Command{
  21. UsageLine: "volume -port=8080 -dir=/tmp -max=5 -ip=server_name -mserver=localhost:9333",
  22. Short: "start a volume server",
  23. Long: `start a volume server to provide storage spaces
  24. `,
  25. }
  26. var (
  27. vport = cmdVolume.Flag.Int("port", 8080, "http listen port")
  28. volumeFolder = cmdVolume.Flag.String("dir", "/tmp", "directory to store data files")
  29. ip = cmdVolume.Flag.String("ip", "localhost", "ip or server name")
  30. publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible <ip|server_name>:<port>")
  31. masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location")
  32. vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting")
  33. maxVolumeCount = cmdVolume.Flag.Int("max", 7, "maximum number of volumes")
  34. vReadTimeout = cmdVolume.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
  35. vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
  36. dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
  37. rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
  38. store *storage.Store
  39. )
  40. var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
  41. func statusHandler(w http.ResponseWriter, r *http.Request) {
  42. m := make(map[string]interface{})
  43. m["Version"] = VERSION
  44. m["Volumes"] = store.Status()
  45. writeJsonQuiet(w, r, m)
  46. }
  47. func assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
  48. err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType"))
  49. if err == nil {
  50. writeJsonQuiet(w, r, map[string]string{"error": ""})
  51. } else {
  52. writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
  53. }
  54. debug("assign volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err)
  55. }
  56. func vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) {
  57. err, ret := store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold"))
  58. if err == nil {
  59. writeJsonQuiet(w, r, map[string]interface{}{"error": "", "result": ret})
  60. } else {
  61. writeJsonQuiet(w, r, map[string]interface{}{"error": err.Error(), "result": false})
  62. }
  63. debug("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret)
  64. }
  65. func vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) {
  66. err := store.CompactVolume(r.FormValue("volume"))
  67. if err == nil {
  68. writeJsonQuiet(w, r, map[string]string{"error": ""})
  69. } else {
  70. writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
  71. }
  72. debug("compacted volume =", r.FormValue("volume"), ", error =", err)
  73. }
  74. func vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) {
  75. err := store.CommitCompactVolume(r.FormValue("volume"))
  76. if err == nil {
  77. writeJsonQuiet(w, r, map[string]interface{}{"error": ""})
  78. } else {
  79. writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
  80. }
  81. debug("commit compact volume =", r.FormValue("volume"), ", error =", err)
  82. }
  83. func freezeVolumeHandler(w http.ResponseWriter, r *http.Request) {
  84. //TODO: notify master that this volume will be read-only
  85. err := store.FreezeVolume(r.FormValue("volume"))
  86. if err == nil {
  87. writeJsonQuiet(w, r, map[string]interface{}{"error": ""})
  88. } else {
  89. writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
  90. }
  91. debug("freeze volume =", r.FormValue("volume"), ", error =", err)
  92. }
  93. func storeHandler(w http.ResponseWriter, r *http.Request) {
  94. switch r.Method {
  95. case "GET":
  96. GetOrHeadHandler(w, r, true)
  97. case "HEAD":
  98. GetOrHeadHandler(w, r, false)
  99. case "DELETE":
  100. DeleteHandler(w, r)
  101. case "POST":
  102. PostHandler(w, r)
  103. }
  104. }
  105. func GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) {
  106. n := new(storage.Needle)
  107. vid, fid, filename, ext := parseURLPath(r.URL.Path)
  108. volumeId, err := storage.NewVolumeId(vid)
  109. if err != nil {
  110. debug("parsing error:", err, r.URL.Path)
  111. return
  112. }
  113. n.ParsePath(fid)
  114. debug("volume", volumeId, "reading", n)
  115. if !store.HasVolume(volumeId) {
  116. lookupResult, err := operation.Lookup(*masterNode, volumeId)
  117. debug("volume", volumeId, "found on", lookupResult, "error", err)
  118. if err == nil {
  119. http.Redirect(w, r, "http://"+lookupResult.Locations[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
  120. } else {
  121. debug("lookup error:", err, r.URL.Path)
  122. w.WriteHeader(http.StatusNotFound)
  123. }
  124. return
  125. }
  126. cookie := n.Cookie
  127. count, e := store.Read(volumeId, n)
  128. debug("read bytes", count, "error", e)
  129. if e != nil || count <= 0 {
  130. debug("read error:", e, r.URL.Path)
  131. w.WriteHeader(http.StatusNotFound)
  132. return
  133. }
  134. if n.Cookie != cookie {
  135. log.Println("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
  136. w.WriteHeader(http.StatusNotFound)
  137. return
  138. }
  139. if n.LastModified != 0 {
  140. w.Header().Set("Last-Modified", time.Unix(int64(n.LastModified), 0).UTC().Format(http.TimeFormat))
  141. if r.Header.Get("If-Modified-Since") != "" {
  142. if t, parseError := time.Parse(http.TimeFormat, r.Header.Get("If-Modified-Since")); parseError == nil {
  143. if t.Unix() >= int64(n.LastModified) {
  144. w.WriteHeader(http.StatusNotModified)
  145. return
  146. }
  147. }
  148. }
  149. }
  150. if n.NameSize > 0 && filename == "" {
  151. filename := string(n.Name)
  152. dotIndex := strings.LastIndex(filename, ".")
  153. if dotIndex > 0 {
  154. ext = filename[dotIndex:]
  155. }
  156. }
  157. mtype := ""
  158. if ext != "" {
  159. mtype = mime.TypeByExtension(ext)
  160. }
  161. if n.MimeSize > 0 {
  162. mtype = string(n.Mime)
  163. }
  164. if mtype != "" {
  165. w.Header().Set("Content-Type", mtype)
  166. }
  167. if filename != "" {
  168. w.Header().Set("Content-Disposition", "filename="+fileNameEscaper.Replace(filename))
  169. }
  170. if ext != ".gz" {
  171. if n.IsGzipped() {
  172. if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
  173. w.Header().Set("Content-Encoding", "gzip")
  174. } else {
  175. if n.Data, err = storage.UnGzipData(n.Data); err != nil {
  176. debug("lookup error:", err, r.URL.Path)
  177. }
  178. }
  179. }
  180. }
  181. w.Header().Set("Content-Length", strconv.Itoa(len(n.Data)))
  182. if isGetMethod {
  183. if _, e = w.Write(n.Data); e != nil {
  184. debug("response write error:", e)
  185. }
  186. }
  187. }
  188. func PostHandler(w http.ResponseWriter, r *http.Request) {
  189. if e := r.ParseForm(); e != nil {
  190. debug("form parse error:", e)
  191. writeJsonQuiet(w, r, e)
  192. return
  193. }
  194. vid, _, _, _ := parseURLPath(r.URL.Path)
  195. volumeId, e := storage.NewVolumeId(vid)
  196. if e != nil {
  197. debug("NewVolumeId error:", e)
  198. writeJsonQuiet(w, r, e)
  199. return
  200. }
  201. if e != nil {
  202. writeJsonQuiet(w, r, e)
  203. } else {
  204. needle, ne := storage.NewNeedle(r)
  205. if ne != nil {
  206. writeJsonQuiet(w, r, ne)
  207. } else {
  208. ret, errorStatus := replication.ReplicatedWrite(*masterNode, store, volumeId, needle, r)
  209. m := make(map[string]interface{})
  210. if errorStatus == "" {
  211. w.WriteHeader(http.StatusCreated)
  212. } else {
  213. w.WriteHeader(http.StatusInternalServerError)
  214. m["error"] = errorStatus
  215. }
  216. m["size"] = ret
  217. writeJsonQuiet(w, r, m)
  218. }
  219. }
  220. }
  221. func DeleteHandler(w http.ResponseWriter, r *http.Request) {
  222. n := new(storage.Needle)
  223. vid, fid, _, _ := parseURLPath(r.URL.Path)
  224. volumeId, _ := storage.NewVolumeId(vid)
  225. n.ParsePath(fid)
  226. debug("deleting", n)
  227. cookie := n.Cookie
  228. count, ok := store.Read(volumeId, n)
  229. if ok != nil {
  230. m := make(map[string]uint32)
  231. m["size"] = 0
  232. writeJsonQuiet(w, r, m)
  233. return
  234. }
  235. if n.Cookie != cookie {
  236. log.Println("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
  237. return
  238. }
  239. n.Size = 0
  240. ret := replication.ReplicatedDelete(*masterNode, store, volumeId, n, r)
  241. if ret != 0 {
  242. w.WriteHeader(http.StatusAccepted)
  243. } else {
  244. w.WriteHeader(http.StatusInternalServerError)
  245. }
  246. m := make(map[string]uint32)
  247. m["size"] = uint32(count)
  248. writeJsonQuiet(w, r, m)
  249. }
  250. func parseURLPath(path string) (vid, fid, filename, ext string) {
  251. if strings.Count(path, "/") == 3 {
  252. parts := strings.Split(path, "/")
  253. vid, fid, filename = parts[1], parts[2], parts[3]
  254. ext = filename[strings.LastIndex(filename, "."):]
  255. } else {
  256. sepIndex := strings.LastIndex(path, "/")
  257. commaIndex := strings.LastIndex(path[sepIndex:], ",")
  258. if commaIndex <= 0 {
  259. if "favicon.ico" != path[sepIndex+1:] {
  260. log.Println("unknown file id", path[sepIndex+1:])
  261. }
  262. return
  263. }
  264. dotIndex := strings.LastIndex(path[sepIndex:], ".")
  265. vid = path[sepIndex+1 : commaIndex]
  266. fid = path[commaIndex+1:]
  267. ext = ""
  268. if dotIndex > 0 {
  269. fid = path[commaIndex+1 : dotIndex]
  270. ext = path[dotIndex:]
  271. }
  272. }
  273. return
  274. }
  275. func runVolume(cmd *Command, args []string) bool {
  276. if *vMaxCpu < 1 {
  277. *vMaxCpu = runtime.NumCPU()
  278. }
  279. runtime.GOMAXPROCS(*vMaxCpu)
  280. fileInfo, err := os.Stat(*volumeFolder)
  281. if err != nil {
  282. log.Fatalf("No Existing Folder:%s", *volumeFolder)
  283. }
  284. if !fileInfo.IsDir() {
  285. log.Fatalf("Volume Folder should not be a file:%s", *volumeFolder)
  286. }
  287. perm := fileInfo.Mode().Perm()
  288. log.Println("Volume Folder permission:", perm)
  289. if *publicUrl == "" {
  290. *publicUrl = *ip + ":" + strconv.Itoa(*vport)
  291. }
  292. store = storage.NewStore(*vport, *ip, *publicUrl, *volumeFolder, *maxVolumeCount)
  293. defer store.Close()
  294. http.HandleFunc("/", storeHandler)
  295. http.HandleFunc("/status", statusHandler)
  296. http.HandleFunc("/admin/assign_volume", assignVolumeHandler)
  297. http.HandleFunc("/admin/vacuum_volume_check", vacuumVolumeCheckHandler)
  298. http.HandleFunc("/admin/vacuum_volume_compact", vacuumVolumeCompactHandler)
  299. http.HandleFunc("/admin/vacuum_volume_commit", vacuumVolumeCommitHandler)
  300. http.HandleFunc("/admin/freeze_volume", freezeVolumeHandler)
  301. go func() {
  302. connected := true
  303. store.SetMaster(*masterNode)
  304. store.SetDataCenter(*dataCenter)
  305. store.SetRack(*rack)
  306. for {
  307. err := store.Join()
  308. if err == nil {
  309. if !connected {
  310. connected = true
  311. log.Println("Reconnected with master")
  312. }
  313. } else {
  314. if connected {
  315. connected = false
  316. }
  317. }
  318. time.Sleep(time.Duration(float32(*vpulse*1e3)*(1+rand.Float32())) * time.Millisecond)
  319. }
  320. }()
  321. log.Println("store joined at", *masterNode)
  322. log.Println("Start Weed volume server", VERSION, "at http://"+*ip+":"+strconv.Itoa(*vport))
  323. srv := &http.Server{
  324. Addr: ":" + strconv.Itoa(*vport),
  325. Handler: http.DefaultServeMux,
  326. ReadTimeout: (time.Duration(*vReadTimeout) * time.Second),
  327. }
  328. e := srv.ListenAndServe()
  329. if e != nil {
  330. log.Fatalf("Fail to start:%s", e.Error())
  331. }
  332. return true
  333. }