You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

342 lines
10 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package weed_server
  2. import (
  3. "bytes"
  4. "crypto/md5"
  5. "encoding/base64"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "io/ioutil"
  11. "mime/multipart"
  12. "net/http"
  13. "net/textproto"
  14. "net/url"
  15. "strings"
  16. "github.com/chrislusf/seaweedfs/weed/glog"
  17. "github.com/chrislusf/seaweedfs/weed/operation"
  18. "github.com/chrislusf/seaweedfs/weed/storage"
  19. "github.com/chrislusf/seaweedfs/weed/util"
  20. "github.com/syndtr/goleveldb/leveldb"
  21. )
  22. type FilerPostResult struct {
  23. Name string `json:"name,omitempty"`
  24. Size uint32 `json:"size,omitempty"`
  25. Error string `json:"error,omitempty"`
  26. Fid string `json:"fid,omitempty"`
  27. Url string `json:"url,omitempty"`
  28. }
  29. var quoteEscaper = strings.NewReplacer("\\", "\\\\", `"`, "\\\"")
  30. func escapeQuotes(s string) string {
  31. return quoteEscaper.Replace(s)
  32. }
  33. func createFormFile(writer *multipart.Writer, fieldname, filename, mime string) (io.Writer, error) {
  34. h := make(textproto.MIMEHeader)
  35. h.Set("Content-Disposition",
  36. fmt.Sprintf(`form-data; name="%s"; filename="%s"`,
  37. escapeQuotes(fieldname), escapeQuotes(filename)))
  38. if len(mime) == 0 {
  39. mime = "application/octet-stream"
  40. }
  41. h.Set("Content-Type", mime)
  42. return writer.CreatePart(h)
  43. }
  44. func makeFormData(filename, mimeType string, content io.Reader) (formData io.Reader, contentType string, err error) {
  45. buf := new(bytes.Buffer)
  46. writer := multipart.NewWriter(buf)
  47. defer writer.Close()
  48. part, err := createFormFile(writer, "file", filename, mimeType)
  49. if err != nil {
  50. glog.V(0).Infoln(err)
  51. return
  52. }
  53. _, err = io.Copy(part, content)
  54. if err != nil {
  55. glog.V(0).Infoln(err)
  56. return
  57. }
  58. formData = buf
  59. contentType = writer.FormDataContentType()
  60. return
  61. }
  62. func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Request, path string) (fileId, urlLocation string, err error) {
  63. if fileId, err = fs.filer.FindFile(path); err != nil && err != leveldb.ErrNotFound {
  64. glog.V(0).Infoln("failing to find path in filer store", path, err.Error())
  65. writeJsonError(w, r, http.StatusInternalServerError, err)
  66. return
  67. } else if fileId != "" && err == nil {
  68. urlLocation, err = operation.LookupFileId(fs.getMasterNode(), fileId)
  69. if err != nil {
  70. glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error())
  71. w.WriteHeader(http.StatusNotFound)
  72. return
  73. }
  74. }
  75. return
  76. }
  77. func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection string) (fileId, urlLocation string, err error) {
  78. ar := &operation.VolumeAssignRequest{
  79. Count: 1,
  80. Replication: replication,
  81. Collection: collection,
  82. Ttl: r.URL.Query().Get("ttl"),
  83. }
  84. assignResult, ae := operation.Assign(fs.getMasterNode(), ar)
  85. if ae != nil {
  86. glog.V(0).Infoln("failing to assign a file id", ae.Error())
  87. writeJsonError(w, r, http.StatusInternalServerError, ae)
  88. err = ae
  89. return
  90. }
  91. fileId = assignResult.Fid
  92. urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid
  93. return
  94. }
  95. func (fs *FilerServer) multipartUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string) (fileId, urlLocation string, err error) {
  96. //Default handle way for http multipart
  97. if r.Method == "PUT" {
  98. buf, _ := ioutil.ReadAll(r.Body)
  99. r.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
  100. fileName, _, _, _, _, _, _, pe := storage.ParseUpload(r)
  101. if pe != nil {
  102. glog.V(0).Infoln("failing to parse post body", pe.Error())
  103. writeJsonError(w, r, http.StatusInternalServerError, pe)
  104. err = pe
  105. return
  106. }
  107. //reconstruct http request body for following new request to volume server
  108. r.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
  109. path := r.URL.Path
  110. if strings.HasSuffix(path, "/") {
  111. if fileName != "" {
  112. path += fileName
  113. }
  114. }
  115. fileId, urlLocation, err = fs.queryFileInfoByPath(w, r, path)
  116. } else {
  117. fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection)
  118. }
  119. return
  120. }
  121. func multipartHttpBodyBuilder(w http.ResponseWriter, r *http.Request, fileName string) (err error) {
  122. body, contentType, te := makeFormData(fileName, r.Header.Get("Content-Type"), r.Body)
  123. if te != nil {
  124. glog.V(0).Infoln("S3 protocol to raw seaweed protocol failed", te.Error())
  125. writeJsonError(w, r, http.StatusInternalServerError, te)
  126. err = te
  127. return
  128. }
  129. if body != nil {
  130. switch v := body.(type) {
  131. case *bytes.Buffer:
  132. r.ContentLength = int64(v.Len())
  133. case *bytes.Reader:
  134. r.ContentLength = int64(v.Len())
  135. case *strings.Reader:
  136. r.ContentLength = int64(v.Len())
  137. }
  138. }
  139. r.Header.Set("Content-Type", contentType)
  140. rc, ok := body.(io.ReadCloser)
  141. if !ok && body != nil {
  142. rc = ioutil.NopCloser(body)
  143. }
  144. r.Body = rc
  145. return
  146. }
  147. func checkContentMD5(w http.ResponseWriter, r *http.Request) (err error) {
  148. if contentMD5 := r.Header.Get("Content-MD5"); contentMD5 != "" {
  149. buf, _ := ioutil.ReadAll(r.Body)
  150. //checkMD5
  151. sum := md5.Sum(buf)
  152. fileDataMD5 := base64.StdEncoding.EncodeToString(sum[0:len(sum)])
  153. if strings.ToLower(fileDataMD5) != strings.ToLower(contentMD5) {
  154. glog.V(0).Infof("fileDataMD5 [%s] is not equal to Content-MD5 [%s]", fileDataMD5, contentMD5)
  155. err = fmt.Errorf("MD5 check failed")
  156. writeJsonError(w, r, http.StatusNotAcceptable, err)
  157. return
  158. }
  159. //reconstruct http request body for following new request to volume server
  160. r.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
  161. }
  162. return
  163. }
  164. func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string) (fileId, urlLocation string, err error) {
  165. /*
  166. Amazon S3 ref link:[http://docs.aws.amazon.com/AmazonS3/latest/API/Welcome.html]
  167. There is a long way to provide a completely compatibility against all Amazon S3 API, I just made
  168. a simple data stream adapter between S3 PUT API and seaweedfs's volume storage Write API
  169. 1. The request url format should be http://$host:$port/$bucketName/$objectName
  170. 2. bucketName will be mapped to seaweedfs's collection name
  171. 3. You could customize and make your enhancement.
  172. */
  173. lastPos := strings.LastIndex(r.URL.Path, "/")
  174. if lastPos == -1 || lastPos == 0 || lastPos == len(r.URL.Path)-1 {
  175. glog.V(0).Infoln("URL Path [%s] is invalid, could not retrieve file name", r.URL.Path)
  176. err = fmt.Errorf("URL Path is invalid")
  177. writeJsonError(w, r, http.StatusInternalServerError, err)
  178. return
  179. }
  180. if err = checkContentMD5(w, r); err != nil {
  181. return
  182. }
  183. fileName := r.URL.Path[lastPos+1:]
  184. if err = multipartHttpBodyBuilder(w, r, fileName); err != nil {
  185. return
  186. }
  187. secondPos := strings.Index(r.URL.Path[1:], "/") + 1
  188. collection = r.URL.Path[1:secondPos]
  189. path := r.URL.Path
  190. if fileId, urlLocation, err = fs.queryFileInfoByPath(w, r, path); err == nil && fileId == "" {
  191. fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection)
  192. }
  193. return
  194. }
  195. func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
  196. query := r.URL.Query()
  197. replication := query.Get("replication")
  198. if replication == "" {
  199. replication = fs.defaultReplication
  200. }
  201. collection := query.Get("collection")
  202. if collection == "" {
  203. collection = fs.collection
  204. }
  205. var fileId, urlLocation string
  206. var err error
  207. if strings.HasPrefix(r.Header.Get("Content-Type"), "multipart/form-data; boundary=") {
  208. fileId, urlLocation, err = fs.multipartUploadAnalyzer(w, r, replication, collection)
  209. if err != nil {
  210. return
  211. }
  212. } else {
  213. fileId, urlLocation, err = fs.monolithicUploadAnalyzer(w, r, replication, collection)
  214. if err != nil {
  215. return
  216. }
  217. }
  218. u, _ := url.Parse(urlLocation)
  219. glog.V(4).Infoln("post to", u)
  220. request := &http.Request{
  221. Method: r.Method,
  222. URL: u,
  223. Proto: r.Proto,
  224. ProtoMajor: r.ProtoMajor,
  225. ProtoMinor: r.ProtoMinor,
  226. Header: r.Header,
  227. Body: r.Body,
  228. Host: r.Host,
  229. ContentLength: r.ContentLength,
  230. }
  231. resp, do_err := util.Do(request)
  232. if do_err != nil {
  233. glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error())
  234. writeJsonError(w, r, http.StatusInternalServerError, do_err)
  235. return
  236. }
  237. defer resp.Body.Close()
  238. resp_body, ra_err := ioutil.ReadAll(resp.Body)
  239. if ra_err != nil {
  240. glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error())
  241. writeJsonError(w, r, http.StatusInternalServerError, ra_err)
  242. return
  243. }
  244. glog.V(4).Infoln("post result", string(resp_body))
  245. var ret operation.UploadResult
  246. unmarshal_err := json.Unmarshal(resp_body, &ret)
  247. if unmarshal_err != nil {
  248. glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body))
  249. writeJsonError(w, r, http.StatusInternalServerError, unmarshal_err)
  250. return
  251. }
  252. if ret.Error != "" {
  253. glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
  254. writeJsonError(w, r, http.StatusInternalServerError, errors.New(ret.Error))
  255. return
  256. }
  257. path := r.URL.Path
  258. if strings.HasSuffix(path, "/") {
  259. if ret.Name != "" {
  260. path += ret.Name
  261. } else {
  262. operation.DeleteFile(fs.getMasterNode(), fileId, fs.jwt(fileId)) //clean up
  263. glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
  264. writeJsonError(w, r, http.StatusInternalServerError,
  265. errors.New("Can not to write to folder "+path+" without a file name"))
  266. return
  267. }
  268. }
  269. // also delete the old fid unless PUT operation
  270. if r.Method != "PUT" {
  271. if oldFid, err := fs.filer.FindFile(path); err == nil {
  272. operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid))
  273. }
  274. }
  275. glog.V(4).Infoln("saving", path, "=>", fileId)
  276. if db_err := fs.filer.CreateFile(path, fileId); db_err != nil {
  277. operation.DeleteFile(fs.getMasterNode(), fileId, fs.jwt(fileId)) //clean up
  278. glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
  279. writeJsonError(w, r, http.StatusInternalServerError, db_err)
  280. return
  281. }
  282. reply := FilerPostResult{
  283. Name: ret.Name,
  284. Size: ret.Size,
  285. Error: ret.Error,
  286. Fid: fileId,
  287. Url: urlLocation,
  288. }
  289. writeJsonQuiet(w, r, http.StatusCreated, reply)
  290. }
  291. // curl -X DELETE http://localhost:8888/path/to
  292. // curl -X DELETE http://localhost:8888/path/to?recursive=true
  293. func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
  294. var err error
  295. var fid string
  296. if strings.HasSuffix(r.URL.Path, "/") {
  297. isRecursive := r.FormValue("recursive") == "true"
  298. err = fs.filer.DeleteDirectory(r.URL.Path, isRecursive)
  299. } else {
  300. fid, err = fs.filer.DeleteFile(r.URL.Path)
  301. if err == nil && fid != "" {
  302. err = operation.DeleteFile(fs.getMasterNode(), fid, fs.jwt(fid))
  303. }
  304. }
  305. if err == nil {
  306. writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""})
  307. } else {
  308. glog.V(4).Infoln("deleting", r.URL.Path, ":", err.Error())
  309. writeJsonError(w, r, http.StatusInternalServerError, err)
  310. }
  311. }