You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

254 lines
8.0 KiB

  1. package weed_server
  2. import (
  3. "code.google.com/p/weed-fs/go/glog"
  4. "code.google.com/p/weed-fs/go/operation"
  5. "code.google.com/p/weed-fs/go/replication"
  6. "code.google.com/p/weed-fs/go/storage"
  7. "mime"
  8. "net/http"
  9. "strconv"
  10. "strings"
  11. "time"
  12. )
  13. var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
  14. func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
  15. m := make(map[string]interface{})
  16. m["Version"] = vs.version
  17. m["Volumes"] = vs.store.Status()
  18. writeJsonQuiet(w, r, m)
  19. }
  20. func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
  21. err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication"))
  22. if err == nil {
  23. writeJsonQuiet(w, r, map[string]string{"error": ""})
  24. } else {
  25. writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
  26. }
  27. glog.V(2).Infoln("assign volume =", r.FormValue("volume"), ", collection =", r.FormValue("collection"), ", replication =", r.FormValue("replication"), ", error =", err)
  28. }
  29. func (vs *VolumeServer) deleteCollectionHandler(w http.ResponseWriter, r *http.Request) {
  30. if "benchmark" != r.FormValue("collection") {
  31. glog.V(0).Infoln("deleting collection =", r.FormValue("collection"), "!!!")
  32. return
  33. }
  34. err := vs.store.DeleteCollection(r.FormValue("collection"))
  35. if err == nil {
  36. writeJsonQuiet(w, r, map[string]string{"error": ""})
  37. } else {
  38. writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
  39. }
  40. glog.V(2).Infoln("deleting collection =", r.FormValue("collection"), ", error =", err)
  41. }
  42. func (vs *VolumeServer) vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) {
  43. err, ret := vs.store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold"))
  44. if err == nil {
  45. writeJsonQuiet(w, r, map[string]interface{}{"error": "", "result": ret})
  46. } else {
  47. writeJsonQuiet(w, r, map[string]interface{}{"error": err.Error(), "result": false})
  48. }
  49. glog.V(2).Infoln("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret)
  50. }
  51. func (vs *VolumeServer) vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) {
  52. err := vs.store.CompactVolume(r.FormValue("volume"))
  53. if err == nil {
  54. writeJsonQuiet(w, r, map[string]string{"error": ""})
  55. } else {
  56. writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
  57. }
  58. glog.V(2).Infoln("compacted volume =", r.FormValue("volume"), ", error =", err)
  59. }
  60. func (vs *VolumeServer) vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) {
  61. err := vs.store.CommitCompactVolume(r.FormValue("volume"))
  62. if err == nil {
  63. writeJsonQuiet(w, r, map[string]interface{}{"error": ""})
  64. } else {
  65. writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
  66. }
  67. glog.V(2).Infoln("commit compact volume =", r.FormValue("volume"), ", error =", err)
  68. }
  69. func (vs *VolumeServer) freezeVolumeHandler(w http.ResponseWriter, r *http.Request) {
  70. //TODO: notify master that this volume will be read-only
  71. err := vs.store.FreezeVolume(r.FormValue("volume"))
  72. if err == nil {
  73. writeJsonQuiet(w, r, map[string]interface{}{"error": ""})
  74. } else {
  75. writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
  76. }
  77. glog.V(2).Infoln("freeze volume =", r.FormValue("volume"), ", error =", err)
  78. }
  79. func (vs *VolumeServer) submitFromVolumeServerHandler(w http.ResponseWriter, r *http.Request) {
  80. submitForClientHandler(w, r, vs.masterNode)
  81. }
  82. func (vs *VolumeServer) storeHandler(w http.ResponseWriter, r *http.Request) {
  83. switch r.Method {
  84. case "GET":
  85. vs.GetOrHeadHandler(w, r, true)
  86. case "HEAD":
  87. vs.GetOrHeadHandler(w, r, false)
  88. case "DELETE":
  89. secure(vs.whiteList, vs.DeleteHandler)(w, r)
  90. case "PUT":
  91. secure(vs.whiteList, vs.PostHandler)(w, r)
  92. case "POST":
  93. secure(vs.whiteList, vs.PostHandler)(w, r)
  94. }
  95. }
  96. func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) {
  97. n := new(storage.Needle)
  98. vid, fid, filename, ext, _ := parseURLPath(r.URL.Path)
  99. volumeId, err := storage.NewVolumeId(vid)
  100. if err != nil {
  101. glog.V(2).Infoln("parsing error:", err, r.URL.Path)
  102. w.WriteHeader(http.StatusBadRequest)
  103. return
  104. }
  105. err = n.ParsePath(fid)
  106. if err != nil {
  107. glog.V(2).Infoln("parsing fid error:", err, r.URL.Path)
  108. w.WriteHeader(http.StatusBadRequest)
  109. return
  110. }
  111. glog.V(2).Infoln("volume", volumeId, "reading", n)
  112. if !vs.store.HasVolume(volumeId) {
  113. lookupResult, err := operation.Lookup(vs.masterNode, volumeId.String())
  114. glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
  115. if err == nil && len(lookupResult.Locations) > 0 {
  116. http.Redirect(w, r, "http://"+lookupResult.Locations[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
  117. } else {
  118. glog.V(2).Infoln("lookup error:", err, r.URL.Path)
  119. w.WriteHeader(http.StatusNotFound)
  120. }
  121. return
  122. }
  123. cookie := n.Cookie
  124. count, e := vs.store.Read(volumeId, n)
  125. glog.V(2).Infoln("read bytes", count, "error", e)
  126. if e != nil || count <= 0 {
  127. glog.V(0).Infoln("read error:", e, r.URL.Path)
  128. w.WriteHeader(http.StatusNotFound)
  129. return
  130. }
  131. if n.Cookie != cookie {
  132. glog.V(0).Infoln("request with unmaching cookie seen:", cookie, "expected:", n.Cookie, "from", r.RemoteAddr, "agent", r.UserAgent())
  133. w.WriteHeader(http.StatusNotFound)
  134. return
  135. }
  136. if n.LastModified != 0 {
  137. w.Header().Set("Last-Modified", time.Unix(int64(n.LastModified), 0).UTC().Format(http.TimeFormat))
  138. if r.Header.Get("If-Modified-Since") != "" {
  139. if t, parseError := time.Parse(http.TimeFormat, r.Header.Get("If-Modified-Since")); parseError == nil {
  140. if t.Unix() >= int64(n.LastModified) {
  141. w.WriteHeader(http.StatusNotModified)
  142. return
  143. }
  144. }
  145. }
  146. }
  147. if n.NameSize > 0 && filename == "" {
  148. filename = string(n.Name)
  149. dotIndex := strings.LastIndex(filename, ".")
  150. if dotIndex > 0 {
  151. ext = filename[dotIndex:]
  152. }
  153. }
  154. mtype := ""
  155. if ext != "" {
  156. mtype = mime.TypeByExtension(ext)
  157. }
  158. if n.MimeSize > 0 {
  159. mtype = string(n.Mime)
  160. }
  161. if mtype != "" {
  162. w.Header().Set("Content-Type", mtype)
  163. }
  164. if filename != "" {
  165. w.Header().Set("Content-Disposition", "filename="+fileNameEscaper.Replace(filename))
  166. }
  167. if ext != ".gz" {
  168. if n.IsGzipped() {
  169. if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
  170. w.Header().Set("Content-Encoding", "gzip")
  171. } else {
  172. if n.Data, err = storage.UnGzipData(n.Data); err != nil {
  173. glog.V(0).Infoln("lookup error:", err, r.URL.Path)
  174. }
  175. }
  176. }
  177. }
  178. w.Header().Set("Content-Length", strconv.Itoa(len(n.Data)))
  179. if isGetMethod {
  180. if _, e = w.Write(n.Data); e != nil {
  181. glog.V(0).Infoln("response write error:", e)
  182. }
  183. }
  184. }
  185. func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
  186. m := make(map[string]interface{})
  187. if e := r.ParseForm(); e != nil {
  188. glog.V(0).Infoln("form parse error:", e)
  189. writeJsonError(w, r, e)
  190. return
  191. }
  192. vid, _, _, _, _ := parseURLPath(r.URL.Path)
  193. volumeId, ve := storage.NewVolumeId(vid)
  194. if ve != nil {
  195. glog.V(0).Infoln("NewVolumeId error:", ve)
  196. writeJsonError(w, r, ve)
  197. return
  198. }
  199. needle, ne := storage.NewNeedle(r)
  200. if ne != nil {
  201. writeJsonError(w, r, ne)
  202. return
  203. }
  204. ret, errorStatus := replication.ReplicatedWrite(vs.masterNode, vs.store, volumeId, needle, r)
  205. if errorStatus == "" {
  206. w.WriteHeader(http.StatusCreated)
  207. } else {
  208. w.WriteHeader(http.StatusInternalServerError)
  209. m["error"] = errorStatus
  210. }
  211. m["size"] = ret
  212. writeJsonQuiet(w, r, m)
  213. }
  214. func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
  215. n := new(storage.Needle)
  216. vid, fid, _, _, _ := parseURLPath(r.URL.Path)
  217. volumeId, _ := storage.NewVolumeId(vid)
  218. n.ParsePath(fid)
  219. glog.V(2).Infoln("deleting", n)
  220. cookie := n.Cookie
  221. count, ok := vs.store.Read(volumeId, n)
  222. if ok != nil {
  223. m := make(map[string]uint32)
  224. m["size"] = 0
  225. writeJsonQuiet(w, r, m)
  226. return
  227. }
  228. if n.Cookie != cookie {
  229. glog.V(0).Infoln("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
  230. return
  231. }
  232. n.Size = 0
  233. ret := replication.ReplicatedDelete(vs.masterNode, vs.store, volumeId, n, r)
  234. if ret != 0 {
  235. w.WriteHeader(http.StatusAccepted)
  236. } else {
  237. w.WriteHeader(http.StatusInternalServerError)
  238. }
  239. m := make(map[string]uint32)
  240. m["size"] = uint32(count)
  241. writeJsonQuiet(w, r, m)
  242. }