You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

226 lines
6.8 KiB

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
10 years ago
11 years ago
10 years ago
  1. package weed_server
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "io"
  6. "io/ioutil"
  7. "math/rand"
  8. "net/http"
  9. "net/url"
  10. "strconv"
  11. "strings"
  12. "github.com/chrislusf/seaweedfs/go/glog"
  13. "github.com/chrislusf/seaweedfs/go/operation"
  14. "github.com/chrislusf/seaweedfs/go/util"
  15. "github.com/syndtr/goleveldb/leveldb"
  16. )
  17. func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
  18. switch r.Method {
  19. case "GET":
  20. fs.GetOrHeadHandler(w, r, true)
  21. case "HEAD":
  22. fs.GetOrHeadHandler(w, r, false)
  23. case "DELETE":
  24. fs.DeleteHandler(w, r)
  25. case "PUT":
  26. fs.PostHandler(w, r)
  27. case "POST":
  28. fs.PostHandler(w, r)
  29. }
  30. }
  31. // listDirectoryHandler lists directories and folers under a directory
  32. // files are sorted by name and paginated via "lastFileName" and "limit".
  33. // sub directories are listed on the first page, when "lastFileName"
  34. // is empty.
  35. func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Request) {
  36. if !strings.HasSuffix(r.URL.Path, "/") {
  37. return
  38. }
  39. dirlist, err := fs.filer.ListDirectories(r.URL.Path)
  40. if err == leveldb.ErrNotFound {
  41. glog.V(3).Infoln("Directory Not Found in db", r.URL.Path)
  42. w.WriteHeader(http.StatusNotFound)
  43. return
  44. }
  45. m := make(map[string]interface{})
  46. m["Directory"] = r.URL.Path
  47. lastFileName := r.FormValue("lastFileName")
  48. if lastFileName == "" {
  49. m["Subdirectories"] = dirlist
  50. }
  51. limit, limit_err := strconv.Atoi(r.FormValue("limit"))
  52. if limit_err != nil {
  53. limit = 100
  54. }
  55. m["Files"], _ = fs.filer.ListFiles(r.URL.Path, lastFileName, limit)
  56. writeJsonQuiet(w, r, http.StatusOK, m)
  57. }
  58. func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) {
  59. if strings.HasSuffix(r.URL.Path, "/") {
  60. if fs.disableDirListing {
  61. w.WriteHeader(http.StatusMethodNotAllowed)
  62. return
  63. }
  64. fs.listDirectoryHandler(w, r)
  65. return
  66. }
  67. fileId, err := fs.filer.FindFile(r.URL.Path)
  68. if err == leveldb.ErrNotFound {
  69. glog.V(3).Infoln("Not found in db", r.URL.Path)
  70. w.WriteHeader(http.StatusNotFound)
  71. return
  72. }
  73. parts := strings.Split(fileId, ",")
  74. if len(parts) != 2 {
  75. glog.V(1).Infoln("Invalid fileId", fileId)
  76. w.WriteHeader(http.StatusNotFound)
  77. return
  78. }
  79. lookup, lookupError := operation.Lookup(fs.master, parts[0])
  80. if lookupError != nil {
  81. glog.V(1).Infoln("Invalid lookup", lookupError.Error())
  82. w.WriteHeader(http.StatusNotFound)
  83. return
  84. }
  85. if len(lookup.Locations) == 0 {
  86. glog.V(1).Infoln("Can not find location for volume", parts[0])
  87. w.WriteHeader(http.StatusNotFound)
  88. return
  89. }
  90. urlLocation := lookup.Locations[rand.Intn(len(lookup.Locations))].Url
  91. urlString := "http://" + urlLocation + "/" + fileId
  92. if fs.redirectOnRead {
  93. http.Redirect(w, r, urlString, http.StatusFound)
  94. return
  95. }
  96. u, _ := url.Parse(urlString)
  97. request := &http.Request{
  98. Method: r.Method,
  99. URL: u,
  100. Proto: r.Proto,
  101. ProtoMajor: r.ProtoMajor,
  102. ProtoMinor: r.ProtoMinor,
  103. Header: r.Header,
  104. Body: r.Body,
  105. Host: r.Host,
  106. ContentLength: r.ContentLength,
  107. }
  108. glog.V(3).Infoln("retrieving from", u)
  109. resp, do_err := util.Do(request)
  110. if do_err != nil {
  111. glog.V(0).Infoln("failing to connect to volume server", do_err.Error())
  112. writeJsonError(w, r, http.StatusInternalServerError, do_err)
  113. return
  114. }
  115. defer resp.Body.Close()
  116. for k, v := range resp.Header {
  117. w.Header()[k] = v
  118. }
  119. w.WriteHeader(resp.StatusCode)
  120. io.Copy(w, resp.Body)
  121. }
  122. func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
  123. query := r.URL.Query()
  124. replication := query.Get("replication")
  125. if replication == "" {
  126. replication = fs.defaultReplication
  127. }
  128. collection := query.Get("collection")
  129. if collection == "" {
  130. collection = fs.collection
  131. }
  132. assignResult, ae := operation.Assign(fs.master, 1, replication, collection, query.Get("ttl"))
  133. if ae != nil {
  134. glog.V(0).Infoln("failing to assign a file id", ae.Error())
  135. writeJsonError(w, r, http.StatusInternalServerError, ae)
  136. return
  137. }
  138. u, _ := url.Parse("http://" + assignResult.Url + "/" + assignResult.Fid)
  139. glog.V(4).Infoln("post to", u)
  140. request := &http.Request{
  141. Method: r.Method,
  142. URL: u,
  143. Proto: r.Proto,
  144. ProtoMajor: r.ProtoMajor,
  145. ProtoMinor: r.ProtoMinor,
  146. Header: r.Header,
  147. Body: r.Body,
  148. Host: r.Host,
  149. ContentLength: r.ContentLength,
  150. }
  151. resp, do_err := util.Do(request)
  152. if do_err != nil {
  153. glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error())
  154. writeJsonError(w, r, http.StatusInternalServerError, do_err)
  155. return
  156. }
  157. defer resp.Body.Close()
  158. resp_body, ra_err := ioutil.ReadAll(resp.Body)
  159. if ra_err != nil {
  160. glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error())
  161. writeJsonError(w, r, http.StatusInternalServerError, ra_err)
  162. return
  163. }
  164. glog.V(4).Infoln("post result", string(resp_body))
  165. var ret operation.UploadResult
  166. unmarshal_err := json.Unmarshal(resp_body, &ret)
  167. if unmarshal_err != nil {
  168. glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body))
  169. writeJsonError(w, r, http.StatusInternalServerError, unmarshal_err)
  170. return
  171. }
  172. if ret.Error != "" {
  173. glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
  174. writeJsonError(w, r, http.StatusInternalServerError, errors.New(ret.Error))
  175. return
  176. }
  177. path := r.URL.Path
  178. if strings.HasSuffix(path, "/") {
  179. if ret.Name != "" {
  180. path += ret.Name
  181. } else {
  182. operation.DeleteFile(fs.master, assignResult.Fid, fs.jwt(assignResult.Fid)) //clean up
  183. glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
  184. writeJsonError(w, r, http.StatusInternalServerError,
  185. errors.New("Can not to write to folder "+path+" without a file name"))
  186. return
  187. }
  188. }
  189. glog.V(4).Infoln("saving", path, "=>", assignResult.Fid)
  190. if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil {
  191. operation.DeleteFile(fs.master, assignResult.Fid, fs.jwt(assignResult.Fid)) //clean up
  192. glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
  193. writeJsonError(w, r, http.StatusInternalServerError, db_err)
  194. return
  195. }
  196. w.WriteHeader(http.StatusCreated)
  197. w.Write(resp_body)
  198. }
  199. // curl -X DELETE http://localhost:8888/path/to
  200. // curl -X DELETE http://localhost:8888/path/to?recursive=true
  201. func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
  202. var err error
  203. var fid string
  204. if strings.HasSuffix(r.URL.Path, "/") {
  205. isRecursive := r.FormValue("recursive") == "true"
  206. err = fs.filer.DeleteDirectory(r.URL.Path, isRecursive)
  207. } else {
  208. fid, err = fs.filer.DeleteFile(r.URL.Path)
  209. if err == nil && fid != "" {
  210. err = operation.DeleteFile(fs.master, fid, fs.jwt(fid))
  211. }
  212. }
  213. if err == nil {
  214. writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""})
  215. } else {
  216. glog.V(4).Infoln("deleting", r.URL.Path, ":", err.Error())
  217. writeJsonError(w, r, http.StatusInternalServerError, err)
  218. }
  219. }