You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

393 lines
12 KiB

13 years ago
13 years ago
13 years ago
13 years ago
12 years ago
13 years ago
13 years ago
  1. package main
  2. import (
  3. "bytes"
  4. "code.google.com/p/weed-fs/go/operation"
  5. "code.google.com/p/weed-fs/go/storage"
  6. "log"
  7. "math/rand"
  8. "mime"
  9. "net/http"
  10. "os"
  11. "runtime"
  12. "strconv"
  13. "strings"
  14. "time"
  15. )
  16. func init() {
  17. cmdVolume.Run = runVolume // break init cycle
  18. cmdVolume.IsDebug = cmdVolume.Flag.Bool("debug", false, "enable debug mode")
  19. }
  20. var cmdVolume = &Command{
  21. UsageLine: "volume -port=8080 -dir=/tmp -max=5 -ip=server_name -mserver=localhost:9333",
  22. Short: "start a volume server",
  23. Long: `start a volume server to provide storage spaces
  24. `,
  25. }
  26. var (
  27. vport = cmdVolume.Flag.Int("port", 8080, "http listen port")
  28. volumeFolder = cmdVolume.Flag.String("dir", "/tmp", "directory to store data files")
  29. ip = cmdVolume.Flag.String("ip", "localhost", "ip or server name")
  30. publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible <ip|server_name>:<port>")
  31. masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location")
  32. vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting")
  33. maxVolumeCount = cmdVolume.Flag.Int("max", 5, "maximum number of volumes")
  34. vReadTimeout = cmdVolume.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
  35. vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
  36. store *storage.Store
  37. )
  38. var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
  39. func statusHandler(w http.ResponseWriter, r *http.Request) {
  40. m := make(map[string]interface{})
  41. m["Version"] = VERSION
  42. m["Volumes"] = store.Status()
  43. writeJsonQuiet(w, r, m)
  44. }
  45. func assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
  46. err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType"))
  47. if err == nil {
  48. writeJsonQuiet(w, r, map[string]string{"error": ""})
  49. } else {
  50. writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
  51. }
  52. debug("assign volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err)
  53. }
  54. func vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) {
  55. err, ret := store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold"))
  56. if err == nil {
  57. writeJsonQuiet(w, r, map[string]interface{}{"error": "", "result": ret})
  58. } else {
  59. writeJsonQuiet(w, r, map[string]interface{}{"error": err.Error(), "result": false})
  60. }
  61. debug("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret)
  62. }
  63. func vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) {
  64. err := store.CompactVolume(r.FormValue("volume"))
  65. if err == nil {
  66. writeJsonQuiet(w, r, map[string]string{"error": ""})
  67. } else {
  68. writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
  69. }
  70. debug("compacted volume =", r.FormValue("volume"), ", error =", err)
  71. }
  72. func vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) {
  73. err := store.CommitCompactVolume(r.FormValue("volume"))
  74. if err == nil {
  75. writeJsonQuiet(w, r, map[string]interface{}{"error": ""})
  76. } else {
  77. writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
  78. }
  79. debug("commit compact volume =", r.FormValue("volume"), ", error =", err)
  80. }
  81. func storeHandler(w http.ResponseWriter, r *http.Request) {
  82. switch r.Method {
  83. case "GET":
  84. GetHandler(w, r)
  85. case "DELETE":
  86. DeleteHandler(w, r)
  87. case "POST":
  88. PostHandler(w, r)
  89. }
  90. }
  91. func GetHandler(w http.ResponseWriter, r *http.Request) {
  92. n := new(storage.Needle)
  93. vid, fid, ext := parseURLPath(r.URL.Path)
  94. volumeId, err := storage.NewVolumeId(vid)
  95. if err != nil {
  96. debug("parsing error:", err, r.URL.Path)
  97. return
  98. }
  99. n.ParsePath(fid)
  100. debug("volume", volumeId, "reading", n)
  101. if !store.HasVolume(volumeId) {
  102. lookupResult, err := operation.Lookup(*masterNode, volumeId)
  103. debug("volume", volumeId, "found on", lookupResult, "error", err)
  104. if err == nil {
  105. http.Redirect(w, r, "http://"+lookupResult.Locations[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
  106. } else {
  107. debug("lookup error:", err, r.URL.Path)
  108. w.WriteHeader(http.StatusNotFound)
  109. }
  110. return
  111. }
  112. cookie := n.Cookie
  113. count, e := store.Read(volumeId, n)
  114. debug("read bytes", count, "error", e)
  115. if e != nil || count <= 0 {
  116. debug("read error:", e, r.URL.Path)
  117. w.WriteHeader(http.StatusNotFound)
  118. return
  119. }
  120. if n.Cookie != cookie {
  121. log.Println("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
  122. w.WriteHeader(http.StatusNotFound)
  123. return
  124. }
  125. if n.NameSize > 0 {
  126. fname := string(n.Name)
  127. dotIndex := strings.LastIndex(fname, ".")
  128. if dotIndex > 0 {
  129. ext = fname[dotIndex:]
  130. }
  131. }
  132. mtype := ""
  133. if ext != "" {
  134. mtype = mime.TypeByExtension(ext)
  135. }
  136. if n.MimeSize > 0 {
  137. mtype = string(n.Mime)
  138. }
  139. if mtype != "" {
  140. w.Header().Set("Content-Type", mtype)
  141. }
  142. if n.NameSize > 0 {
  143. w.Header().Set("Content-Disposition", "filename="+fileNameEscaper.Replace(string(n.Name)))
  144. }
  145. if ext != ".gz" {
  146. if n.IsGzipped() {
  147. if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
  148. w.Header().Set("Content-Encoding", "gzip")
  149. } else {
  150. if n.Data, err = storage.UnGzipData(n.Data); err != nil {
  151. debug("lookup error:", err, r.URL.Path)
  152. }
  153. }
  154. }
  155. }
  156. w.Header().Set("Content-Length", strconv.Itoa(len(n.Data)))
  157. if _, e = w.Write(n.Data); e != nil {
  158. debug("response write error:", e)
  159. }
  160. }
  161. func PostHandler(w http.ResponseWriter, r *http.Request) {
  162. if e := r.ParseForm(); e != nil {
  163. debug("form parse error:", e)
  164. writeJsonQuiet(w, r, e)
  165. return
  166. }
  167. vid, _, _ := parseURLPath(r.URL.Path)
  168. volumeId, e := storage.NewVolumeId(vid)
  169. if e != nil {
  170. debug("NewVolumeId error:", e)
  171. writeJsonQuiet(w, r, e)
  172. return
  173. }
  174. if e != nil {
  175. writeJsonQuiet(w, r, e)
  176. } else {
  177. needle, filename, ne := storage.NewNeedle(r)
  178. if ne != nil {
  179. writeJsonQuiet(w, r, ne)
  180. } else {
  181. ret, err := store.Write(volumeId, needle)
  182. errorStatus := ""
  183. needToReplicate := !store.HasVolume(volumeId)
  184. if err != nil {
  185. errorStatus = "Failed to write to local disk (" + err.Error() + ")"
  186. } else if ret > 0 {
  187. needToReplicate = needToReplicate || store.GetVolume(volumeId).NeedToReplicate()
  188. } else {
  189. errorStatus = "Failed to write to local disk"
  190. }
  191. if !needToReplicate && ret > 0 {
  192. needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
  193. }
  194. if needToReplicate { //send to other replica locations
  195. if r.FormValue("type") != "standard" {
  196. if !distributedOperation(volumeId, func(location operation.Location) bool {
  197. _, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data))
  198. return err == nil
  199. }) {
  200. ret = 0
  201. errorStatus = "Failed to write to replicas for volume " + volumeId.String()
  202. }
  203. }
  204. }
  205. m := make(map[string]interface{})
  206. if errorStatus == "" {
  207. w.WriteHeader(http.StatusCreated)
  208. } else {
  209. if _, e = store.Delete(volumeId, needle); e != nil {
  210. errorStatus += "\nCannot delete " + strconv.FormatUint(needle.Id, 10) + " from " +
  211. strconv.FormatUint(uint64(volumeId), 10) + ": " + e.Error()
  212. } else {
  213. distributedOperation(volumeId, func(location operation.Location) bool {
  214. return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard")
  215. })
  216. }
  217. w.WriteHeader(http.StatusInternalServerError)
  218. m["error"] = errorStatus
  219. }
  220. m["size"] = ret
  221. writeJsonQuiet(w, r, m)
  222. }
  223. }
  224. }
  225. func DeleteHandler(w http.ResponseWriter, r *http.Request) {
  226. n := new(storage.Needle)
  227. vid, fid, _ := parseURLPath(r.URL.Path)
  228. volumeId, _ := storage.NewVolumeId(vid)
  229. n.ParsePath(fid)
  230. debug("deleting", n)
  231. cookie := n.Cookie
  232. count, ok := store.Read(volumeId, n)
  233. if ok != nil {
  234. m := make(map[string]uint32)
  235. m["size"] = 0
  236. writeJsonQuiet(w, r, m)
  237. return
  238. }
  239. if n.Cookie != cookie {
  240. log.Println("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
  241. return
  242. }
  243. n.Size = 0
  244. ret, err := store.Delete(volumeId, n)
  245. if err != nil {
  246. log.Println("delete error:", err)
  247. return
  248. }
  249. needToReplicate := !store.HasVolume(volumeId)
  250. if !needToReplicate && ret > 0 {
  251. needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
  252. }
  253. if needToReplicate { //send to other replica locations
  254. if r.FormValue("type") != "standard" {
  255. if !distributedOperation(volumeId, func(location operation.Location) bool {
  256. return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard")
  257. }) {
  258. ret = 0
  259. }
  260. }
  261. }
  262. if ret != 0 {
  263. w.WriteHeader(http.StatusAccepted)
  264. } else {
  265. w.WriteHeader(http.StatusInternalServerError)
  266. }
  267. m := make(map[string]uint32)
  268. m["size"] = uint32(count)
  269. writeJsonQuiet(w, r, m)
  270. }
  271. func parseURLPath(path string) (vid, fid, ext string) {
  272. sepIndex := strings.LastIndex(path, "/")
  273. commaIndex := strings.LastIndex(path[sepIndex:], ",")
  274. if commaIndex <= 0 {
  275. if "favicon.ico" != path[sepIndex+1:] {
  276. log.Println("unknown file id", path[sepIndex+1:])
  277. }
  278. return
  279. }
  280. dotIndex := strings.LastIndex(path[sepIndex:], ".")
  281. vid = path[sepIndex+1 : commaIndex]
  282. fid = path[commaIndex+1:]
  283. ext = ""
  284. if dotIndex > 0 {
  285. fid = path[commaIndex+1 : dotIndex]
  286. ext = path[dotIndex:]
  287. }
  288. return
  289. }
  290. func distributedOperation(volumeId storage.VolumeId, op func(location operation.Location) bool) bool {
  291. if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil {
  292. length := 0
  293. selfUrl := (*ip + ":" + strconv.Itoa(*vport))
  294. results := make(chan bool)
  295. for _, location := range lookupResult.Locations {
  296. if location.Url != selfUrl {
  297. length++
  298. go func(location operation.Location, results chan bool) {
  299. results <- op(location)
  300. }(location, results)
  301. }
  302. }
  303. ret := true
  304. for i := 0; i < length; i++ {
  305. ret = ret && <-results
  306. }
  307. return ret
  308. } else {
  309. log.Println("Failed to lookup for", volumeId, lookupErr.Error())
  310. }
  311. return false
  312. }
  313. func runVolume(cmd *Command, args []string) bool {
  314. if *vMaxCpu < 1 {
  315. *vMaxCpu = runtime.NumCPU()
  316. }
  317. runtime.GOMAXPROCS(*vMaxCpu)
  318. fileInfo, err := os.Stat(*volumeFolder)
  319. if err != nil {
  320. log.Fatalf("No Existing Folder:%s", *volumeFolder)
  321. }
  322. if !fileInfo.IsDir() {
  323. log.Fatalf("Volume Folder should not be a file:%s", *volumeFolder)
  324. }
  325. perm := fileInfo.Mode().Perm()
  326. log.Println("Volume Folder permission:", perm)
  327. if *publicUrl == "" {
  328. *publicUrl = *ip + ":" + strconv.Itoa(*vport)
  329. }
  330. store = storage.NewStore(*vport, *ip, *publicUrl, *volumeFolder, *maxVolumeCount)
  331. defer store.Close()
  332. http.HandleFunc("/", storeHandler)
  333. http.HandleFunc("/status", statusHandler)
  334. http.HandleFunc("/admin/assign_volume", assignVolumeHandler)
  335. http.HandleFunc("/admin/vacuum_volume_check", vacuumVolumeCheckHandler)
  336. http.HandleFunc("/admin/vacuum_volume_compact", vacuumVolumeCompactHandler)
  337. http.HandleFunc("/admin/vacuum_volume_commit", vacuumVolumeCommitHandler)
  338. go func() {
  339. connected := true
  340. store.SetMaster(*masterNode)
  341. for {
  342. err := store.Join()
  343. if err == nil {
  344. if !connected {
  345. connected = true
  346. log.Println("Reconnected with master")
  347. }
  348. } else {
  349. if connected {
  350. connected = false
  351. }
  352. }
  353. time.Sleep(time.Duration(float32(*vpulse*1e3)*(1+rand.Float32())) * time.Millisecond)
  354. }
  355. }()
  356. log.Println("store joined at", *masterNode)
  357. log.Println("Start Weed volume server", VERSION, "at http://"+*ip+":"+strconv.Itoa(*vport))
  358. srv := &http.Server{
  359. Addr: ":" + strconv.Itoa(*vport),
  360. Handler: http.DefaultServeMux,
  361. ReadTimeout: (time.Duration(*vReadTimeout) * time.Second),
  362. }
  363. e := srv.ListenAndServe()
  364. if e != nil {
  365. log.Fatalf("Fail to start:%s", e.Error())
  366. }
  367. return true
  368. }