You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

302 lines
8.8 KiB

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
  1. package weed_server
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "io"
  7. "io/ioutil"
  8. "math/rand"
  9. "net/http"
  10. "net/url"
  11. "strconv"
  12. "strings"
  13. "github.com/chrislusf/seaweedfs/go/glog"
  14. "github.com/chrislusf/seaweedfs/go/operation"
  15. "github.com/chrislusf/seaweedfs/go/storage"
  16. "github.com/chrislusf/seaweedfs/go/util"
  17. "github.com/syndtr/goleveldb/leveldb"
  18. )
  19. func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
  20. switch r.Method {
  21. case "GET":
  22. fs.GetOrHeadHandler(w, r, true)
  23. case "HEAD":
  24. fs.GetOrHeadHandler(w, r, false)
  25. case "DELETE":
  26. fs.DeleteHandler(w, r)
  27. case "PUT":
  28. fs.PostHandler(w, r)
  29. case "POST":
  30. fs.PostHandler(w, r)
  31. }
  32. }
  33. func (fs *FilerServer) publicReadOnlyfilerHandler(w http.ResponseWriter, r *http.Request) {
  34. switch r.Method {
  35. case "GET":
  36. fs.GetOrHeadHandler(w, r, true)
  37. case "HEAD":
  38. fs.GetOrHeadHandler(w, r, false)
  39. }
  40. }
  41. // listDirectoryHandler lists directories and folers under a directory
  42. // files are sorted by name and paginated via "lastFileName" and "limit".
  43. // sub directories are listed on the first page, when "lastFileName"
  44. // is empty.
  45. func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Request) {
  46. if !strings.HasSuffix(r.URL.Path, "/") {
  47. return
  48. }
  49. dirlist, err := fs.filer.ListDirectories(r.URL.Path)
  50. if err == leveldb.ErrNotFound {
  51. glog.V(3).Infoln("Directory Not Found in db", r.URL.Path)
  52. w.WriteHeader(http.StatusNotFound)
  53. return
  54. }
  55. m := make(map[string]interface{})
  56. m["Directory"] = r.URL.Path
  57. lastFileName := r.FormValue("lastFileName")
  58. if lastFileName == "" {
  59. m["Subdirectories"] = dirlist
  60. }
  61. limit, limit_err := strconv.Atoi(r.FormValue("limit"))
  62. if limit_err != nil {
  63. limit = 100
  64. }
  65. m["Files"], _ = fs.filer.ListFiles(r.URL.Path, lastFileName, limit)
  66. writeJsonQuiet(w, r, http.StatusOK, m)
  67. }
  68. func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) {
  69. if strings.HasSuffix(r.URL.Path, "/") {
  70. if fs.disableDirListing {
  71. w.WriteHeader(http.StatusMethodNotAllowed)
  72. return
  73. }
  74. fs.listDirectoryHandler(w, r)
  75. return
  76. }
  77. var fileId string
  78. uri_elements := strings.Split(r.URL.Path, "/")
  79. //glog.V(1).Infof("debug uri_elements %#v", uri_elements)
  80. if len(uri_elements) == 2 {
  81. // /3,01e587647e will be split into []string{"","3,01e587647e"}
  82. fileId = uri_elements[1]
  83. } else {
  84. var err error
  85. fileId, err = fs.filer.FindFile(r.URL.Path)
  86. if err == leveldb.ErrNotFound {
  87. glog.V(3).Infoln("Not found in db", r.URL.Path)
  88. w.WriteHeader(http.StatusNotFound)
  89. return
  90. }
  91. }
  92. parts := strings.Split(fileId, ",")
  93. if len(parts) != 2 {
  94. glog.V(1).Infoln("Invalid fileId", fileId)
  95. w.WriteHeader(http.StatusNotFound)
  96. return
  97. }
  98. lookup, lookupError := operation.Lookup(fs.master, parts[0])
  99. if lookupError != nil {
  100. glog.V(1).Infoln("Invalid lookup", lookupError.Error())
  101. w.WriteHeader(http.StatusNotFound)
  102. return
  103. }
  104. if len(lookup.Locations) == 0 {
  105. glog.V(1).Infoln("Can not find location for volume", parts[0])
  106. w.WriteHeader(http.StatusNotFound)
  107. return
  108. }
  109. urlLocation := lookup.Locations[rand.Intn(len(lookup.Locations))].Url
  110. urlString := "http://" + urlLocation + "/" + fileId
  111. if fs.redirectOnRead {
  112. http.Redirect(w, r, urlString, http.StatusFound)
  113. return
  114. }
  115. u, _ := url.Parse(urlString)
  116. request := &http.Request{
  117. Method: r.Method,
  118. URL: u,
  119. Proto: r.Proto,
  120. ProtoMajor: r.ProtoMajor,
  121. ProtoMinor: r.ProtoMinor,
  122. Header: r.Header,
  123. Body: r.Body,
  124. Host: r.Host,
  125. ContentLength: r.ContentLength,
  126. }
  127. glog.V(3).Infoln("retrieving from", u)
  128. resp, do_err := util.Do(request)
  129. if do_err != nil {
  130. glog.V(0).Infoln("failing to connect to volume server", do_err.Error())
  131. writeJsonError(w, r, http.StatusInternalServerError, do_err)
  132. return
  133. }
  134. defer resp.Body.Close()
  135. for k, v := range resp.Header {
  136. w.Header()[k] = v
  137. }
  138. w.WriteHeader(resp.StatusCode)
  139. io.Copy(w, resp.Body)
  140. }
  141. type analogueReader struct {
  142. *bytes.Buffer
  143. }
  144. // So that it implements the io.ReadCloser interface
  145. func (m analogueReader) Close() error { return nil }
  146. func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
  147. query := r.URL.Query()
  148. replication := query.Get("replication")
  149. if replication == "" {
  150. replication = fs.defaultReplication
  151. }
  152. collection := query.Get("collection")
  153. if collection == "" {
  154. collection = fs.collection
  155. }
  156. buf, _ := ioutil.ReadAll(r.Body)
  157. r.Body = analogueReader{bytes.NewBuffer(buf)}
  158. fileName, _, _, _, _, _, _, pe := storage.ParseUpload(r)
  159. if pe != nil {
  160. glog.V(0).Infoln("failing to parse post body", pe.Error())
  161. writeJsonError(w, r, http.StatusInternalServerError, pe)
  162. return
  163. }
  164. //reconstruct http request body for following new request to volume server
  165. r.Body = analogueReader{bytes.NewBuffer(buf)}
  166. path := r.URL.Path
  167. if strings.HasSuffix(path, "/") {
  168. if fileName != "" {
  169. path += fileName
  170. }
  171. }
  172. var fileId string
  173. var err error
  174. var urlLocation string
  175. if fileId, err = fs.filer.FindFile(path); err != nil {
  176. glog.V(0).Infoln("failing to find path in filer store", path, pe.Error())
  177. writeJsonError(w, r, http.StatusInternalServerError, err)
  178. return
  179. } else if fileId != "" {
  180. parts := strings.Split(fileId, ",")
  181. if len(parts) != 2 {
  182. glog.V(1).Infoln("Invalid fileId", fileId)
  183. w.WriteHeader(http.StatusNotFound)
  184. return
  185. }
  186. lookup, lookupError := operation.Lookup(fs.master, parts[0])
  187. if lookupError != nil {
  188. glog.V(1).Infoln("Invalid lookup", lookupError.Error())
  189. w.WriteHeader(http.StatusNotFound)
  190. return
  191. }
  192. if len(lookup.Locations) == 0 {
  193. glog.V(1).Infoln("Can not find location for volume", parts[0])
  194. w.WriteHeader(http.StatusNotFound)
  195. return
  196. }
  197. urlLocation = lookup.Locations[rand.Intn(len(lookup.Locations))].Url
  198. } else {
  199. assignResult, ae := operation.Assign(fs.master, 1, replication, collection, query.Get("ttl"))
  200. if ae != nil {
  201. glog.V(0).Infoln("failing to assign a file id", ae.Error())
  202. writeJsonError(w, r, http.StatusInternalServerError, ae)
  203. return
  204. }
  205. fileId = assignResult.Fid
  206. urlLocation = assignResult.Url
  207. }
  208. u, _ := url.Parse("http://" + urlLocation + "/" + fileId)
  209. glog.V(4).Infoln("post to", u)
  210. request := &http.Request{
  211. Method: r.Method,
  212. URL: u,
  213. Proto: r.Proto,
  214. ProtoMajor: r.ProtoMajor,
  215. ProtoMinor: r.ProtoMinor,
  216. Header: r.Header,
  217. Body: r.Body,
  218. Host: r.Host,
  219. ContentLength: r.ContentLength,
  220. }
  221. resp, do_err := util.Do(request)
  222. if do_err != nil {
  223. glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error())
  224. writeJsonError(w, r, http.StatusInternalServerError, do_err)
  225. return
  226. }
  227. defer resp.Body.Close()
  228. resp_body, ra_err := ioutil.ReadAll(resp.Body)
  229. if ra_err != nil {
  230. glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error())
  231. writeJsonError(w, r, http.StatusInternalServerError, ra_err)
  232. return
  233. }
  234. glog.V(4).Infoln("post result", string(resp_body))
  235. var ret operation.UploadResult
  236. unmarshal_err := json.Unmarshal(resp_body, &ret)
  237. if unmarshal_err != nil {
  238. glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body))
  239. writeJsonError(w, r, http.StatusInternalServerError, unmarshal_err)
  240. return
  241. }
  242. if ret.Error != "" {
  243. glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
  244. writeJsonError(w, r, http.StatusInternalServerError, errors.New(ret.Error))
  245. return
  246. }
  247. path = r.URL.Path
  248. if strings.HasSuffix(path, "/") {
  249. if ret.Name != "" {
  250. path += ret.Name
  251. } else {
  252. operation.DeleteFile(fs.master, fileId, fs.jwt(fileId)) //clean up
  253. glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
  254. writeJsonError(w, r, http.StatusInternalServerError,
  255. errors.New("Can not to write to folder "+path+" without a file name"))
  256. return
  257. }
  258. }
  259. glog.V(4).Infoln("saving", path, "=>", fileId)
  260. if db_err := fs.filer.CreateFile(path, fileId); db_err != nil {
  261. operation.DeleteFile(fs.master, fileId, fs.jwt(fileId)) //clean up
  262. glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
  263. writeJsonError(w, r, http.StatusInternalServerError, db_err)
  264. return
  265. }
  266. w.WriteHeader(http.StatusCreated)
  267. w.Write(resp_body)
  268. }
  269. // curl -X DELETE http://localhost:8888/path/to
  270. // curl -X DELETE http://localhost:8888/path/to?recursive=true
  271. func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
  272. var err error
  273. var fid string
  274. if strings.HasSuffix(r.URL.Path, "/") {
  275. isRecursive := r.FormValue("recursive") == "true"
  276. err = fs.filer.DeleteDirectory(r.URL.Path, isRecursive)
  277. } else {
  278. fid, err = fs.filer.DeleteFile(r.URL.Path)
  279. if err == nil && fid != "" {
  280. err = operation.DeleteFile(fs.master, fid, fs.jwt(fid))
  281. }
  282. }
  283. if err == nil {
  284. writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""})
  285. } else {
  286. glog.V(4).Infoln("deleting", r.URL.Path, ":", err.Error())
  287. writeJsonError(w, r, http.StatusInternalServerError, err)
  288. }
  289. }