You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

188 lines
5.6 KiB

9 years ago
9 years ago
9 years ago
9 years ago
  1. package weed_server
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "io/ioutil"
  7. "net/http"
  8. "net/url"
  9. "strings"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/operation"
  12. "github.com/chrislusf/seaweedfs/weed/storage"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. "github.com/syndtr/goleveldb/leveldb"
  15. )
  16. type analogueReader struct {
  17. *bytes.Buffer
  18. }
  19. // So that it implements the io.ReadCloser interface
  20. func (m analogueReader) Close() error { return nil }
  21. type FilerPostResult struct {
  22. Name string `json:"name,omitempty"`
  23. Size uint32 `json:"size,omitempty"`
  24. Error string `json:"error,omitempty"`
  25. Fid string `json:"fid,omitempty"`
  26. Url string `json:"url,omitempty"`
  27. }
  28. func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
  29. query := r.URL.Query()
  30. replication := query.Get("replication")
  31. if replication == "" {
  32. replication = fs.defaultReplication
  33. }
  34. collection := query.Get("collection")
  35. if collection == "" {
  36. collection = fs.collection
  37. }
  38. var fileId string
  39. var err error
  40. var urlLocation string
  41. if r.Method == "PUT" {
  42. buf, _ := ioutil.ReadAll(r.Body)
  43. r.Body = analogueReader{bytes.NewBuffer(buf)}
  44. fileName, _, _, _, _, _, _, pe := storage.ParseUpload(r)
  45. if pe != nil {
  46. glog.V(0).Infoln("failing to parse post body", pe.Error())
  47. writeJsonError(w, r, http.StatusInternalServerError, pe)
  48. return
  49. }
  50. //reconstruct http request body for following new request to volume server
  51. r.Body = analogueReader{bytes.NewBuffer(buf)}
  52. path := r.URL.Path
  53. if strings.HasSuffix(path, "/") {
  54. if fileName != "" {
  55. path += fileName
  56. }
  57. }
  58. if fileId, err = fs.filer.FindFile(path); err != nil && err != leveldb.ErrNotFound {
  59. glog.V(0).Infoln("failing to find path in filer store", path, err.Error())
  60. writeJsonError(w, r, http.StatusInternalServerError, err)
  61. return
  62. } else if fileId != "" && err == nil {
  63. var le error
  64. urlLocation, le = operation.LookupFileId(fs.master, fileId)
  65. if le != nil {
  66. glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, le.Error())
  67. w.WriteHeader(http.StatusNotFound)
  68. return
  69. }
  70. }
  71. } else {
  72. assignResult, ae := operation.Assign(fs.master, 1, replication, collection, query.Get("ttl"))
  73. if ae != nil {
  74. glog.V(0).Infoln("failing to assign a file id", ae.Error())
  75. writeJsonError(w, r, http.StatusInternalServerError, ae)
  76. return
  77. }
  78. fileId = assignResult.Fid
  79. urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid
  80. }
  81. u, _ := url.Parse(urlLocation)
  82. glog.V(4).Infoln("post to", u)
  83. request := &http.Request{
  84. Method: r.Method,
  85. URL: u,
  86. Proto: r.Proto,
  87. ProtoMajor: r.ProtoMajor,
  88. ProtoMinor: r.ProtoMinor,
  89. Header: r.Header,
  90. Body: r.Body,
  91. Host: r.Host,
  92. ContentLength: r.ContentLength,
  93. }
  94. resp, do_err := util.Do(request)
  95. if do_err != nil {
  96. glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error())
  97. writeJsonError(w, r, http.StatusInternalServerError, do_err)
  98. return
  99. }
  100. defer resp.Body.Close()
  101. resp_body, ra_err := ioutil.ReadAll(resp.Body)
  102. if ra_err != nil {
  103. glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error())
  104. writeJsonError(w, r, http.StatusInternalServerError, ra_err)
  105. return
  106. }
  107. glog.V(4).Infoln("post result", string(resp_body))
  108. var ret operation.UploadResult
  109. unmarshal_err := json.Unmarshal(resp_body, &ret)
  110. if unmarshal_err != nil {
  111. glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body))
  112. writeJsonError(w, r, http.StatusInternalServerError, unmarshal_err)
  113. return
  114. }
  115. if ret.Error != "" {
  116. glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
  117. writeJsonError(w, r, http.StatusInternalServerError, errors.New(ret.Error))
  118. return
  119. }
  120. path := r.URL.Path
  121. if strings.HasSuffix(path, "/") {
  122. if ret.Name != "" {
  123. path += ret.Name
  124. } else {
  125. operation.DeleteFile(fs.master, fileId, fs.jwt(fileId)) //clean up
  126. glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
  127. writeJsonError(w, r, http.StatusInternalServerError,
  128. errors.New("Can not to write to folder "+path+" without a file name"))
  129. return
  130. }
  131. }
  132. // also delete the old fid unless PUT operation
  133. if r.Method != "PUT" {
  134. if oldFid, err := fs.filer.FindFile(path); err == nil {
  135. operation.DeleteFile(fs.master, oldFid, fs.jwt(oldFid))
  136. }
  137. }
  138. glog.V(4).Infoln("saving", path, "=>", fileId)
  139. if db_err := fs.filer.CreateFile(path, fileId); db_err != nil {
  140. operation.DeleteFile(fs.master, fileId, fs.jwt(fileId)) //clean up
  141. glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
  142. writeJsonError(w, r, http.StatusInternalServerError, db_err)
  143. return
  144. }
  145. reply := FilerPostResult{
  146. Name: ret.Name,
  147. Size: ret.Size,
  148. Error: ret.Error,
  149. Fid: fileId,
  150. Url: urlLocation,
  151. }
  152. writeJsonQuiet(w, r, http.StatusCreated, reply)
  153. }
  154. // curl -X DELETE http://localhost:8888/path/to
  155. // curl -X DELETE http://localhost:8888/path/to?recursive=true
  156. func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
  157. var err error
  158. var fid string
  159. if strings.HasSuffix(r.URL.Path, "/") {
  160. isRecursive := r.FormValue("recursive") == "true"
  161. err = fs.filer.DeleteDirectory(r.URL.Path, isRecursive)
  162. } else {
  163. fid, err = fs.filer.DeleteFile(r.URL.Path)
  164. if err == nil && fid != "" {
  165. err = operation.DeleteFile(fs.master, fid, fs.jwt(fid))
  166. }
  167. }
  168. if err == nil {
  169. writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""})
  170. } else {
  171. glog.V(4).Infoln("deleting", r.URL.Path, ":", err.Error())
  172. writeJsonError(w, r, http.StatusInternalServerError, err)
  173. }
  174. }