You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

312 lines
9.7 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package weed_server
  2. import (
  3. "bytes"
  4. "crypto/md5"
  5. "encoding/base64"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "io/ioutil"
  11. "mime/multipart"
  12. "net/http"
  13. "net/textproto"
  14. "net/url"
  15. "strings"
  16. "github.com/chrislusf/seaweedfs/weed/glog"
  17. "github.com/chrislusf/seaweedfs/weed/operation"
  18. "github.com/chrislusf/seaweedfs/weed/storage"
  19. "github.com/chrislusf/seaweedfs/weed/util"
  20. "github.com/syndtr/goleveldb/leveldb"
  21. )
  22. type FilerPostResult struct {
  23. Name string `json:"name,omitempty"`
  24. Size uint32 `json:"size,omitempty"`
  25. Error string `json:"error,omitempty"`
  26. Fid string `json:"fid,omitempty"`
  27. Url string `json:"url,omitempty"`
  28. }
  29. var quoteEscaper = strings.NewReplacer("\\", "\\\\", `"`, "\\\"")
  30. func escapeQuotes(s string) string {
  31. return quoteEscaper.Replace(s)
  32. }
  33. func createFormFile(writer *multipart.Writer, fieldname, filename, mime string) (io.Writer, error) {
  34. h := make(textproto.MIMEHeader)
  35. h.Set("Content-Disposition",
  36. fmt.Sprintf(`form-data; name="%s"; filename="%s"`,
  37. escapeQuotes(fieldname), escapeQuotes(filename)))
  38. if len(mime) == 0 {
  39. mime = "application/octet-stream"
  40. }
  41. h.Set("Content-Type", mime)
  42. return writer.CreatePart(h)
  43. }
  44. func makeFormData(filename, mimeType string, content io.Reader) (formData io.Reader, contentType string, err error) {
  45. buf := new(bytes.Buffer)
  46. writer := multipart.NewWriter(buf)
  47. defer writer.Close()
  48. part, err := createFormFile(writer, "file", filename, mimeType)
  49. if err != nil {
  50. glog.V(0).Infoln(err)
  51. return
  52. }
  53. _, err = io.Copy(part, content)
  54. if err != nil {
  55. glog.V(0).Infoln(err)
  56. return
  57. }
  58. formData = buf
  59. contentType = writer.FormDataContentType()
  60. return
  61. }
  62. func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
  63. query := r.URL.Query()
  64. replication := query.Get("replication")
  65. if replication == "" {
  66. replication = fs.defaultReplication
  67. }
  68. collection := query.Get("collection")
  69. if collection == "" {
  70. collection = fs.collection
  71. }
  72. var fileId string
  73. var err error
  74. var urlLocation string
  75. if strings.HasPrefix(r.Header.Get("Content-Type"), "multipart/form-data; boundary=") {
  76. //Default handle way for http multipart
  77. if r.Method == "PUT" {
  78. buf, _ := ioutil.ReadAll(r.Body)
  79. r.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
  80. fileName, _, _, _, _, _, _, pe := storage.ParseUpload(r)
  81. if pe != nil {
  82. glog.V(0).Infoln("failing to parse post body", pe.Error())
  83. writeJsonError(w, r, http.StatusInternalServerError, pe)
  84. return
  85. }
  86. //reconstruct http request body for following new request to volume server
  87. r.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
  88. path := r.URL.Path
  89. if strings.HasSuffix(path, "/") {
  90. if fileName != "" {
  91. path += fileName
  92. }
  93. }
  94. if fileId, err = fs.filer.FindFile(path); err != nil && err != leveldb.ErrNotFound {
  95. glog.V(0).Infoln("failing to find path in filer store", path, err.Error())
  96. writeJsonError(w, r, http.StatusInternalServerError, err)
  97. return
  98. } else if fileId != "" && err == nil {
  99. var le error
  100. urlLocation, le = operation.LookupFileId(fs.getMasterNode(), fileId)
  101. if le != nil {
  102. glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, le.Error())
  103. w.WriteHeader(http.StatusNotFound)
  104. return
  105. }
  106. }
  107. } else {
  108. assignResult, ae := operation.Assign(fs.getMasterNode(), 1, replication, collection, query.Get("ttl"))
  109. if ae != nil {
  110. glog.V(0).Infoln("failing to assign a file id", ae.Error())
  111. writeJsonError(w, r, http.StatusInternalServerError, ae)
  112. return
  113. }
  114. fileId = assignResult.Fid
  115. urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid
  116. }
  117. } else {
  118. /*
  119. Amazon S3 ref link:[http://docs.aws.amazon.com/AmazonS3/latest/API/Welcome.html]
  120. There is a long way to provide a completely compatibility against all Amazon S3 API, I just made
  121. a simple data stream adapter between S3 PUT API and seaweedfs's volume storage Write API
  122. 1. The request url format should be http://$host:$port/$bucketName/$objectName
  123. 2. bucketName will be mapped to seaweedfs's collection name
  124. */
  125. lastPos := strings.LastIndex(r.URL.Path, "/")
  126. if lastPos == -1 || lastPos == 0 || lastPos == len(r.URL.Path)-1 {
  127. glog.V(0).Infoln("URL Path [%s] is invalid, could not retrieve file name", r.URL.Path)
  128. writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("URL Path is invalid"))
  129. return
  130. }
  131. secondPos := strings.Index(r.URL.Path[1:], "/") + 1
  132. collection = r.URL.Path[1:secondPos]
  133. path := r.URL.Path
  134. if fileId, err = fs.filer.FindFile(path); err != nil && err != leveldb.ErrNotFound {
  135. glog.V(0).Infoln("failing to find path in filer store", path, err.Error())
  136. writeJsonError(w, r, http.StatusInternalServerError, err)
  137. return
  138. } else if fileId != "" && err == nil {
  139. var le error
  140. urlLocation, le = operation.LookupFileId(fs.getMasterNode(), fileId)
  141. if le != nil {
  142. glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, le.Error())
  143. w.WriteHeader(http.StatusNotFound)
  144. return
  145. }
  146. } else {
  147. assignResult, ae := operation.Assign(fs.getMasterNode(), 1, replication, collection, query.Get("ttl"))
  148. if ae != nil {
  149. glog.V(0).Infoln("failing to assign a file id", ae.Error())
  150. writeJsonError(w, r, http.StatusInternalServerError, ae)
  151. return
  152. }
  153. fileId = assignResult.Fid
  154. urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid
  155. }
  156. if contentMD5 := r.Header.Get("Content-MD5"); contentMD5 != "" {
  157. buf, _ := ioutil.ReadAll(r.Body)
  158. //checkMD5
  159. sum := md5.Sum(buf)
  160. fileDataMD5 := base64.StdEncoding.EncodeToString(sum[0:len(sum)])
  161. if strings.ToLower(fileDataMD5) != strings.ToLower(contentMD5) {
  162. glog.V(0).Infof("fileDataMD5 [%s] is not equal to Content-MD5 [%s]", fileDataMD5, contentMD5)
  163. writeJsonError(w, r, http.StatusNotAcceptable, fmt.Errorf("MD5 check failed"))
  164. return
  165. }
  166. //reconstruct http request body for following new request to volume server
  167. r.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
  168. }
  169. fileName := r.URL.Path[lastPos+1:]
  170. body, contentType, te := makeFormData(fileName, r.Header.Get("Content-Type"), r.Body)
  171. if te != nil {
  172. glog.V(0).Infoln("S3 protocol to raw seaweed protocol failed", te.Error())
  173. writeJsonError(w, r, http.StatusInternalServerError, te)
  174. return
  175. }
  176. if body != nil {
  177. switch v := body.(type) {
  178. case *bytes.Buffer:
  179. r.ContentLength = int64(v.Len())
  180. case *bytes.Reader:
  181. r.ContentLength = int64(v.Len())
  182. case *strings.Reader:
  183. r.ContentLength = int64(v.Len())
  184. }
  185. }
  186. r.Header.Set("Content-Type", contentType)
  187. rc, ok := body.(io.ReadCloser)
  188. if !ok && body != nil {
  189. rc = ioutil.NopCloser(body)
  190. }
  191. r.Body = rc
  192. }
  193. u, _ := url.Parse(urlLocation)
  194. glog.V(4).Infoln("post to", u)
  195. request := &http.Request{
  196. Method: r.Method,
  197. URL: u,
  198. Proto: r.Proto,
  199. ProtoMajor: r.ProtoMajor,
  200. ProtoMinor: r.ProtoMinor,
  201. Header: r.Header,
  202. Body: r.Body,
  203. Host: r.Host,
  204. ContentLength: r.ContentLength,
  205. }
  206. resp, do_err := util.Do(request)
  207. if do_err != nil {
  208. glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error())
  209. writeJsonError(w, r, http.StatusInternalServerError, do_err)
  210. return
  211. }
  212. defer resp.Body.Close()
  213. resp_body, ra_err := ioutil.ReadAll(resp.Body)
  214. if ra_err != nil {
  215. glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error())
  216. writeJsonError(w, r, http.StatusInternalServerError, ra_err)
  217. return
  218. }
  219. glog.V(4).Infoln("post result", string(resp_body))
  220. var ret operation.UploadResult
  221. unmarshal_err := json.Unmarshal(resp_body, &ret)
  222. if unmarshal_err != nil {
  223. glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body))
  224. writeJsonError(w, r, http.StatusInternalServerError, unmarshal_err)
  225. return
  226. }
  227. if ret.Error != "" {
  228. glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
  229. writeJsonError(w, r, http.StatusInternalServerError, errors.New(ret.Error))
  230. return
  231. }
  232. path := r.URL.Path
  233. if strings.HasSuffix(path, "/") {
  234. if ret.Name != "" {
  235. path += ret.Name
  236. } else {
  237. operation.DeleteFile(fs.getMasterNode(), fileId, fs.jwt(fileId)) //clean up
  238. glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
  239. writeJsonError(w, r, http.StatusInternalServerError,
  240. errors.New("Can not to write to folder "+path+" without a file name"))
  241. return
  242. }
  243. }
  244. // also delete the old fid unless PUT operation
  245. if r.Method != "PUT" {
  246. if oldFid, err := fs.filer.FindFile(path); err == nil {
  247. operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid))
  248. }
  249. }
  250. glog.V(4).Infoln("saving", path, "=>", fileId)
  251. if db_err := fs.filer.CreateFile(path, fileId); db_err != nil {
  252. operation.DeleteFile(fs.getMasterNode(), fileId, fs.jwt(fileId)) //clean up
  253. glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
  254. writeJsonError(w, r, http.StatusInternalServerError, db_err)
  255. return
  256. }
  257. reply := FilerPostResult{
  258. Name: ret.Name,
  259. Size: ret.Size,
  260. Error: ret.Error,
  261. Fid: fileId,
  262. Url: urlLocation,
  263. }
  264. writeJsonQuiet(w, r, http.StatusCreated, reply)
  265. }
  266. // curl -X DELETE http://localhost:8888/path/to
  267. // curl -X DELETE http://localhost:8888/path/to?recursive=true
  268. func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
  269. var err error
  270. var fid string
  271. if strings.HasSuffix(r.URL.Path, "/") {
  272. isRecursive := r.FormValue("recursive") == "true"
  273. err = fs.filer.DeleteDirectory(r.URL.Path, isRecursive)
  274. } else {
  275. fid, err = fs.filer.DeleteFile(r.URL.Path)
  276. if err == nil && fid != "" {
  277. err = operation.DeleteFile(fs.getMasterNode(), fid, fs.jwt(fid))
  278. }
  279. }
  280. if err == nil {
  281. writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""})
  282. } else {
  283. glog.V(4).Infoln("deleting", r.URL.Path, ":", err.Error())
  284. writeJsonError(w, r, http.StatusInternalServerError, err)
  285. }
  286. }