You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

211 lines
6.3 KiB

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package weed_server
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "io"
  6. "io/ioutil"
  7. "math/rand"
  8. "net/http"
  9. "net/url"
  10. "strconv"
  11. "strings"
  12. "github.com/chrislusf/weed-fs/go/glog"
  13. "github.com/chrislusf/weed-fs/go/operation"
  14. "github.com/chrislusf/weed-fs/go/util"
  15. "github.com/syndtr/goleveldb/leveldb"
  16. )
  17. func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
  18. switch r.Method {
  19. case "GET":
  20. fs.GetOrHeadHandler(w, r, true)
  21. case "HEAD":
  22. fs.GetOrHeadHandler(w, r, false)
  23. case "DELETE":
  24. fs.DeleteHandler(w, r)
  25. case "PUT":
  26. fs.PostHandler(w, r)
  27. case "POST":
  28. fs.PostHandler(w, r)
  29. }
  30. }
  31. func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Request) {
  32. if !strings.HasSuffix(r.URL.Path, "/") {
  33. return
  34. }
  35. dirlist, err := fs.filer.ListDirectories(r.URL.Path)
  36. if err == leveldb.ErrNotFound {
  37. glog.V(3).Infoln("Directory Not Found in db", r.URL.Path)
  38. w.WriteHeader(http.StatusNotFound)
  39. return
  40. }
  41. m := make(map[string]interface{})
  42. m["Directory"] = r.URL.Path
  43. m["Subdirectories"] = dirlist
  44. lastFileName := r.FormValue("lastFileName")
  45. limit, limit_err := strconv.Atoi(r.FormValue("limit"))
  46. if limit_err != nil {
  47. limit = 100
  48. }
  49. m["Files"], _ = fs.filer.ListFiles(r.URL.Path, lastFileName, limit)
  50. writeJsonQuiet(w, r, http.StatusOK, m)
  51. }
  52. func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) {
  53. if strings.HasSuffix(r.URL.Path, "/") {
  54. fs.listDirectoryHandler(w, r)
  55. return
  56. }
  57. fileId, err := fs.filer.FindFile(r.URL.Path)
  58. if err == leveldb.ErrNotFound {
  59. glog.V(3).Infoln("Not found in db", r.URL.Path)
  60. w.WriteHeader(http.StatusNotFound)
  61. return
  62. }
  63. parts := strings.Split(fileId, ",")
  64. if len(parts) != 2 {
  65. glog.V(1).Infoln("Invalid fileId", fileId)
  66. w.WriteHeader(http.StatusNotFound)
  67. return
  68. }
  69. lookup, lookupError := operation.Lookup(fs.master, parts[0])
  70. if lookupError != nil {
  71. glog.V(1).Infoln("Invalid lookup", lookupError.Error())
  72. w.WriteHeader(http.StatusNotFound)
  73. return
  74. }
  75. if len(lookup.Locations) == 0 {
  76. glog.V(1).Infoln("Can not find location for volume", parts[0])
  77. w.WriteHeader(http.StatusNotFound)
  78. return
  79. }
  80. urlLocation := lookup.Locations[rand.Intn(len(lookup.Locations))].PublicUrl
  81. urlString := "http://" + urlLocation + "/" + fileId
  82. if fs.redirectOnRead {
  83. http.Redirect(w, r, urlString, http.StatusFound)
  84. return
  85. }
  86. u, _ := url.Parse(urlString)
  87. request := &http.Request{
  88. Method: r.Method,
  89. URL: u,
  90. Proto: r.Proto,
  91. ProtoMajor: r.ProtoMajor,
  92. ProtoMinor: r.ProtoMinor,
  93. Header: r.Header,
  94. Body: r.Body,
  95. Host: r.Host,
  96. ContentLength: r.ContentLength,
  97. }
  98. glog.V(3).Infoln("retrieving from", u)
  99. resp, do_err := util.Do(request)
  100. if do_err != nil {
  101. glog.V(0).Infoln("failing to connect to volume server", do_err.Error())
  102. writeJsonError(w, r, http.StatusInternalServerError, do_err)
  103. return
  104. }
  105. defer resp.Body.Close()
  106. for k, v := range resp.Header {
  107. w.Header()[k] = v
  108. }
  109. w.WriteHeader(resp.StatusCode)
  110. io.Copy(w, resp.Body)
  111. }
  112. func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
  113. query := r.URL.Query()
  114. replication := query.Get("replication")
  115. if replication == "" {
  116. replication = fs.defaultReplication
  117. }
  118. assignResult, ae := operation.Assign(fs.master, 1, replication, fs.collection, query.Get("ttl"))
  119. if ae != nil {
  120. glog.V(0).Infoln("failing to assign a file id", ae.Error())
  121. writeJsonError(w, r, http.StatusInternalServerError, ae)
  122. return
  123. }
  124. u, _ := url.Parse("http://" + assignResult.PublicUrl + "/" + assignResult.Fid)
  125. glog.V(4).Infoln("post to", u)
  126. request := &http.Request{
  127. Method: r.Method,
  128. URL: u,
  129. Proto: r.Proto,
  130. ProtoMajor: r.ProtoMajor,
  131. ProtoMinor: r.ProtoMinor,
  132. Header: r.Header,
  133. Body: r.Body,
  134. Host: r.Host,
  135. ContentLength: r.ContentLength,
  136. }
  137. resp, do_err := util.Do(request)
  138. if do_err != nil {
  139. glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error())
  140. writeJsonError(w, r, http.StatusInternalServerError, do_err)
  141. return
  142. }
  143. defer resp.Body.Close()
  144. resp_body, ra_err := ioutil.ReadAll(resp.Body)
  145. if ra_err != nil {
  146. glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error())
  147. writeJsonError(w, r, http.StatusInternalServerError, ra_err)
  148. return
  149. }
  150. glog.V(4).Infoln("post result", string(resp_body))
  151. var ret operation.UploadResult
  152. unmarshal_err := json.Unmarshal(resp_body, &ret)
  153. if unmarshal_err != nil {
  154. glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body))
  155. writeJsonError(w, r, http.StatusInternalServerError, unmarshal_err)
  156. return
  157. }
  158. if ret.Error != "" {
  159. glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
  160. writeJsonError(w, r, http.StatusInternalServerError, errors.New(ret.Error))
  161. return
  162. }
  163. path := r.URL.Path
  164. if strings.HasSuffix(path, "/") {
  165. if ret.Name != "" {
  166. path += ret.Name
  167. } else {
  168. operation.DeleteFile(fs.master, assignResult.Fid) //clean up
  169. glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
  170. writeJsonError(w, r, http.StatusInternalServerError,
  171. errors.New("Can not to write to folder "+path+" without a file name"))
  172. return
  173. }
  174. }
  175. glog.V(4).Infoln("saving", path, "=>", assignResult.Fid)
  176. if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil {
  177. operation.DeleteFile(fs.master, assignResult.Fid) //clean up
  178. glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
  179. writeJsonError(w, r, http.StatusInternalServerError, db_err)
  180. return
  181. }
  182. w.WriteHeader(http.StatusCreated)
  183. w.Write(resp_body)
  184. }
  185. // curl -X DELETE http://localhost:8888/path/to
  186. // curl -X DELETE http://localhost:8888/path/to?recursive=true
  187. func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
  188. var err error
  189. var fid string
  190. if strings.HasSuffix(r.URL.Path, "/") {
  191. isRecursive := r.FormValue("recursive") == "true"
  192. err = fs.filer.DeleteDirectory(r.URL.Path, isRecursive)
  193. } else {
  194. fid, err = fs.filer.DeleteFile(r.URL.Path)
  195. if err == nil {
  196. err = operation.DeleteFile(fs.master, fid)
  197. }
  198. }
  199. if err == nil {
  200. writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""})
  201. } else {
  202. glog.V(4).Infoln("deleting", r.URL.Path, ":", err.Error())
  203. writeJsonError(w, r, http.StatusInternalServerError, err)
  204. }
  205. }