You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

548 lines
16 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package weed_server
  2. import (
  3. "bytes"
  4. "crypto/md5"
  5. "encoding/base64"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "io/ioutil"
  11. "mime/multipart"
  12. "net/http"
  13. "net/textproto"
  14. "net/url"
  15. "path"
  16. "strconv"
  17. "strings"
  18. "github.com/chrislusf/seaweedfs/weed/filer"
  19. "github.com/chrislusf/seaweedfs/weed/glog"
  20. "github.com/chrislusf/seaweedfs/weed/operation"
  21. "github.com/chrislusf/seaweedfs/weed/storage"
  22. "github.com/chrislusf/seaweedfs/weed/util"
  23. "github.com/chrislusf/seaweedfs/weed/filer2"
  24. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  25. "time"
  26. )
  27. type FilerPostResult struct {
  28. Name string `json:"name,omitempty"`
  29. Size uint32 `json:"size,omitempty"`
  30. Error string `json:"error,omitempty"`
  31. Fid string `json:"fid,omitempty"`
  32. Url string `json:"url,omitempty"`
  33. }
  34. var quoteEscaper = strings.NewReplacer("\\", "\\\\", `"`, "\\\"")
  35. func escapeQuotes(s string) string {
  36. return quoteEscaper.Replace(s)
  37. }
  38. func createFormFile(writer *multipart.Writer, fieldname, filename, mime string) (io.Writer, error) {
  39. h := make(textproto.MIMEHeader)
  40. h.Set("Content-Disposition",
  41. fmt.Sprintf(`form-data; name="%s"; filename="%s"`,
  42. escapeQuotes(fieldname), escapeQuotes(filename)))
  43. if len(mime) == 0 {
  44. mime = "application/octet-stream"
  45. }
  46. h.Set("Content-Type", mime)
  47. return writer.CreatePart(h)
  48. }
  49. func makeFormData(filename, mimeType string, content io.Reader) (formData io.Reader, contentType string, err error) {
  50. buf := new(bytes.Buffer)
  51. writer := multipart.NewWriter(buf)
  52. defer writer.Close()
  53. part, err := createFormFile(writer, "file", filename, mimeType)
  54. if err != nil {
  55. glog.V(0).Infoln(err)
  56. return
  57. }
  58. _, err = io.Copy(part, content)
  59. if err != nil {
  60. glog.V(0).Infoln(err)
  61. return
  62. }
  63. formData = buf
  64. contentType = writer.FormDataContentType()
  65. return
  66. }
  67. func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Request, path string) (fileId, urlLocation string, err error) {
  68. var found bool
  69. var entry *filer2.Entry
  70. if found, entry, err = fs.filer.FindEntry(filer2.FullPath(path)); err != nil {
  71. glog.V(0).Infoln("failing to find path in filer store", path, err.Error())
  72. writeJsonError(w, r, http.StatusInternalServerError, err)
  73. } else if found {
  74. fileId = entry.Chunks[0].FileId
  75. urlLocation, err = operation.LookupFileId(fs.getMasterNode(), fileId)
  76. if err != nil {
  77. glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error())
  78. w.WriteHeader(http.StatusNotFound)
  79. }
  80. } else {
  81. w.WriteHeader(http.StatusNotFound)
  82. }
  83. return
  84. }
  85. func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection string) (fileId, urlLocation string, err error) {
  86. ar := &operation.VolumeAssignRequest{
  87. Count: 1,
  88. Replication: replication,
  89. Collection: collection,
  90. Ttl: r.URL.Query().Get("ttl"),
  91. }
  92. assignResult, ae := operation.Assign(fs.getMasterNode(), ar)
  93. if ae != nil {
  94. glog.V(0).Infoln("failing to assign a file id", ae.Error())
  95. writeJsonError(w, r, http.StatusInternalServerError, ae)
  96. err = ae
  97. return
  98. }
  99. fileId = assignResult.Fid
  100. urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid
  101. return
  102. }
  103. func (fs *FilerServer) multipartUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string) (fileId, urlLocation string, err error) {
  104. //Default handle way for http multipart
  105. if r.Method == "PUT" {
  106. buf, _ := ioutil.ReadAll(r.Body)
  107. r.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
  108. fileName, _, _, _, _, _, _, _, pe := storage.ParseUpload(r)
  109. if pe != nil {
  110. glog.V(0).Infoln("failing to parse post body", pe.Error())
  111. writeJsonError(w, r, http.StatusInternalServerError, pe)
  112. err = pe
  113. return
  114. }
  115. //reconstruct http request body for following new request to volume server
  116. r.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
  117. path := r.URL.Path
  118. if strings.HasSuffix(path, "/") {
  119. if fileName != "" {
  120. path += fileName
  121. }
  122. }
  123. fileId, urlLocation, err = fs.queryFileInfoByPath(w, r, path)
  124. } else {
  125. fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection)
  126. }
  127. return
  128. }
  129. func multipartHttpBodyBuilder(w http.ResponseWriter, r *http.Request, fileName string) (err error) {
  130. body, contentType, te := makeFormData(fileName, r.Header.Get("Content-Type"), r.Body)
  131. if te != nil {
  132. glog.V(0).Infoln("S3 protocol to raw seaweed protocol failed", te.Error())
  133. writeJsonError(w, r, http.StatusInternalServerError, te)
  134. err = te
  135. return
  136. }
  137. if body != nil {
  138. switch v := body.(type) {
  139. case *bytes.Buffer:
  140. r.ContentLength = int64(v.Len())
  141. case *bytes.Reader:
  142. r.ContentLength = int64(v.Len())
  143. case *strings.Reader:
  144. r.ContentLength = int64(v.Len())
  145. }
  146. }
  147. r.Header.Set("Content-Type", contentType)
  148. rc, ok := body.(io.ReadCloser)
  149. if !ok && body != nil {
  150. rc = ioutil.NopCloser(body)
  151. }
  152. r.Body = rc
  153. return
  154. }
  155. func checkContentMD5(w http.ResponseWriter, r *http.Request) (err error) {
  156. if contentMD5 := r.Header.Get("Content-MD5"); contentMD5 != "" {
  157. buf, _ := ioutil.ReadAll(r.Body)
  158. //checkMD5
  159. sum := md5.Sum(buf)
  160. fileDataMD5 := base64.StdEncoding.EncodeToString(sum[0:len(sum)])
  161. if strings.ToLower(fileDataMD5) != strings.ToLower(contentMD5) {
  162. glog.V(0).Infof("fileDataMD5 [%s] is not equal to Content-MD5 [%s]", fileDataMD5, contentMD5)
  163. err = fmt.Errorf("MD5 check failed")
  164. writeJsonError(w, r, http.StatusNotAcceptable, err)
  165. return
  166. }
  167. //reconstruct http request body for following new request to volume server
  168. r.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
  169. }
  170. return
  171. }
  172. func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string) (fileId, urlLocation string, err error) {
  173. /*
  174. Amazon S3 ref link:[http://docs.aws.amazon.com/AmazonS3/latest/API/Welcome.html]
  175. There is a long way to provide a completely compatibility against all Amazon S3 API, I just made
  176. a simple data stream adapter between S3 PUT API and seaweedfs's volume storage Write API
  177. 1. The request url format should be http://$host:$port/$bucketName/$objectName
  178. 2. bucketName will be mapped to seaweedfs's collection name
  179. 3. You could customize and make your enhancement.
  180. */
  181. lastPos := strings.LastIndex(r.URL.Path, "/")
  182. if lastPos == -1 || lastPos == 0 || lastPos == len(r.URL.Path)-1 {
  183. glog.V(0).Infoln("URL Path [%s] is invalid, could not retrieve file name", r.URL.Path)
  184. err = fmt.Errorf("URL Path is invalid")
  185. writeJsonError(w, r, http.StatusInternalServerError, err)
  186. return
  187. }
  188. if err = checkContentMD5(w, r); err != nil {
  189. return
  190. }
  191. fileName := r.URL.Path[lastPos+1:]
  192. if err = multipartHttpBodyBuilder(w, r, fileName); err != nil {
  193. return
  194. }
  195. secondPos := strings.Index(r.URL.Path[1:], "/") + 1
  196. collection = r.URL.Path[1:secondPos]
  197. path := r.URL.Path
  198. if fileId, urlLocation, err = fs.queryFileInfoByPath(w, r, path); err == nil && fileId == "" {
  199. fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection)
  200. }
  201. return
  202. }
  203. func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
  204. query := r.URL.Query()
  205. replication := query.Get("replication")
  206. if replication == "" {
  207. replication = fs.defaultReplication
  208. }
  209. collection := query.Get("collection")
  210. if collection == "" {
  211. collection = fs.collection
  212. }
  213. if autoChunked := fs.autoChunk(w, r, replication, collection); autoChunked {
  214. return
  215. }
  216. var fileId, urlLocation string
  217. var err error
  218. if strings.HasPrefix(r.Header.Get("Content-Type"), "multipart/form-data; boundary=") {
  219. fileId, urlLocation, err = fs.multipartUploadAnalyzer(w, r, replication, collection)
  220. if err != nil {
  221. return
  222. }
  223. } else {
  224. fileId, urlLocation, err = fs.monolithicUploadAnalyzer(w, r, replication, collection)
  225. if err != nil {
  226. return
  227. }
  228. }
  229. u, _ := url.Parse(urlLocation)
  230. // This allows a client to generate a chunk manifest and submit it to the filer -- it is a little off
  231. // because they need to provide FIDs instead of file paths...
  232. cm, _ := strconv.ParseBool(query.Get("cm"))
  233. if cm {
  234. q := u.Query()
  235. q.Set("cm", "true")
  236. u.RawQuery = q.Encode()
  237. }
  238. glog.V(4).Infoln("post to", u)
  239. request := &http.Request{
  240. Method: r.Method,
  241. URL: u,
  242. Proto: r.Proto,
  243. ProtoMajor: r.ProtoMajor,
  244. ProtoMinor: r.ProtoMinor,
  245. Header: r.Header,
  246. Body: r.Body,
  247. Host: r.Host,
  248. ContentLength: r.ContentLength,
  249. }
  250. resp, do_err := util.Do(request)
  251. if do_err != nil {
  252. glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error())
  253. writeJsonError(w, r, http.StatusInternalServerError, do_err)
  254. return
  255. }
  256. defer resp.Body.Close()
  257. resp_body, ra_err := ioutil.ReadAll(resp.Body)
  258. if ra_err != nil {
  259. glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error())
  260. writeJsonError(w, r, http.StatusInternalServerError, ra_err)
  261. return
  262. }
  263. glog.V(4).Infoln("post result", string(resp_body))
  264. var ret operation.UploadResult
  265. unmarshal_err := json.Unmarshal(resp_body, &ret)
  266. if unmarshal_err != nil {
  267. glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body))
  268. writeJsonError(w, r, http.StatusInternalServerError, unmarshal_err)
  269. return
  270. }
  271. if ret.Error != "" {
  272. glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
  273. writeJsonError(w, r, http.StatusInternalServerError, errors.New(ret.Error))
  274. return
  275. }
  276. path := r.URL.Path
  277. if strings.HasSuffix(path, "/") {
  278. if ret.Name != "" {
  279. path += ret.Name
  280. } else {
  281. operation.DeleteFile(fs.getMasterNode(), fileId, fs.jwt(fileId)) //clean up
  282. glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
  283. writeJsonError(w, r, http.StatusInternalServerError,
  284. errors.New("Can not to write to folder "+path+" without a file name"))
  285. return
  286. }
  287. }
  288. // also delete the old fid unless PUT operation
  289. if r.Method != "PUT" {
  290. if found, entry, err := fs.filer.FindEntry(filer2.FullPath(path)); err == nil && found {
  291. oldFid := entry.Chunks[0].FileId
  292. operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid))
  293. } else if err != nil && err != filer.ErrNotFound {
  294. glog.V(0).Infof("error %v occur when finding %s in filer store", err, path)
  295. }
  296. }
  297. glog.V(4).Infoln("saving", path, "=>", fileId)
  298. entry := &filer2.Entry{
  299. FullPath: filer2.FullPath(path),
  300. Attr: filer2.Attr{
  301. Mode: 0660,
  302. },
  303. Chunks: []*filer_pb.FileChunk{{
  304. FileId: fileId,
  305. Size: uint64(r.ContentLength),
  306. Mtime: time.Now().UnixNano(),
  307. }},
  308. }
  309. if db_err := fs.filer.CreateEntry(entry); db_err != nil {
  310. operation.DeleteFile(fs.getMasterNode(), fileId, fs.jwt(fileId)) //clean up
  311. glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
  312. writeJsonError(w, r, http.StatusInternalServerError, db_err)
  313. return
  314. }
  315. reply := FilerPostResult{
  316. Name: ret.Name,
  317. Size: ret.Size,
  318. Error: ret.Error,
  319. Fid: fileId,
  320. Url: urlLocation,
  321. }
  322. writeJsonQuiet(w, r, http.StatusCreated, reply)
  323. }
  324. func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replication string, collection string) bool {
  325. if r.Method != "POST" {
  326. glog.V(4).Infoln("AutoChunking not supported for method", r.Method)
  327. return false
  328. }
  329. // autoChunking can be set at the command-line level or as a query param. Query param overrides command-line
  330. query := r.URL.Query()
  331. parsedMaxMB, _ := strconv.ParseInt(query.Get("maxMB"), 10, 32)
  332. maxMB := int32(parsedMaxMB)
  333. if maxMB <= 0 && fs.maxMB > 0 {
  334. maxMB = int32(fs.maxMB)
  335. }
  336. if maxMB <= 0 {
  337. glog.V(4).Infoln("AutoChunking not enabled")
  338. return false
  339. }
  340. glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)")
  341. chunkSize := 1024 * 1024 * maxMB
  342. contentLength := int64(0)
  343. if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 {
  344. contentLength, _ = strconv.ParseInt(contentLengthHeader[0], 10, 64)
  345. if contentLength <= int64(chunkSize) {
  346. glog.V(4).Infoln("Content-Length of", contentLength, "is less than the chunk size of", chunkSize, "so autoChunking will be skipped.")
  347. return false
  348. }
  349. }
  350. if contentLength <= 0 {
  351. glog.V(4).Infoln("Content-Length value is missing or unexpected so autoChunking will be skipped.")
  352. return false
  353. }
  354. reply, err := fs.doAutoChunk(w, r, contentLength, chunkSize, replication, collection)
  355. if err != nil {
  356. writeJsonError(w, r, http.StatusInternalServerError, err)
  357. } else if reply != nil {
  358. writeJsonQuiet(w, r, http.StatusCreated, reply)
  359. }
  360. return true
  361. }
  362. func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, contentLength int64, chunkSize int32, replication string, collection string) (filerResult *FilerPostResult, replyerr error) {
  363. multipartReader, multipartReaderErr := r.MultipartReader()
  364. if multipartReaderErr != nil {
  365. return nil, multipartReaderErr
  366. }
  367. part1, part1Err := multipartReader.NextPart()
  368. if part1Err != nil {
  369. return nil, part1Err
  370. }
  371. fileName := part1.FileName()
  372. if fileName != "" {
  373. fileName = path.Base(fileName)
  374. }
  375. var fileChunks []*filer_pb.FileChunk
  376. totalBytesRead := int64(0)
  377. tmpBufferSize := int32(1024 * 1024)
  378. tmpBuffer := bytes.NewBuffer(make([]byte, 0, tmpBufferSize))
  379. chunkBuf := make([]byte, chunkSize+tmpBufferSize, chunkSize+tmpBufferSize) // chunk size plus a little overflow
  380. chunkBufOffset := int32(0)
  381. chunkOffset := int64(0)
  382. writtenChunks := 0
  383. filerResult = &FilerPostResult{
  384. Name: fileName,
  385. }
  386. for totalBytesRead < contentLength {
  387. tmpBuffer.Reset()
  388. bytesRead, readErr := io.CopyN(tmpBuffer, part1, int64(tmpBufferSize))
  389. readFully := readErr != nil && readErr == io.EOF
  390. tmpBuf := tmpBuffer.Bytes()
  391. bytesToCopy := tmpBuf[0:int(bytesRead)]
  392. copy(chunkBuf[chunkBufOffset:chunkBufOffset+int32(bytesRead)], bytesToCopy)
  393. chunkBufOffset = chunkBufOffset + int32(bytesRead)
  394. if chunkBufOffset >= chunkSize || readFully || (chunkBufOffset > 0 && bytesRead == 0) {
  395. writtenChunks = writtenChunks + 1
  396. fileId, urlLocation, assignErr := fs.assignNewFileInfo(w, r, replication, collection)
  397. if assignErr != nil {
  398. return nil, assignErr
  399. }
  400. // upload the chunk to the volume server
  401. chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(len(fileChunks)+1), 10)
  402. uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId)
  403. if uploadErr != nil {
  404. return nil, uploadErr
  405. }
  406. // Save to chunk manifest structure
  407. fileChunks = append(fileChunks,
  408. &filer_pb.FileChunk{
  409. FileId: fileId,
  410. Offset: chunkOffset,
  411. Size: uint64(chunkBufOffset),
  412. Mtime: time.Now().UnixNano(),
  413. },
  414. )
  415. // reset variables for the next chunk
  416. chunkBufOffset = 0
  417. chunkOffset = totalBytesRead + int64(bytesRead)
  418. }
  419. totalBytesRead = totalBytesRead + int64(bytesRead)
  420. if bytesRead == 0 || readFully {
  421. break
  422. }
  423. if readErr != nil {
  424. return nil, readErr
  425. }
  426. }
  427. path := r.URL.Path
  428. // also delete the old fid unless PUT operation
  429. if r.Method != "PUT" {
  430. if found, entry, err := fs.filer.FindEntry(filer2.FullPath(path)); found && err == nil {
  431. for _, chunk := range entry.Chunks {
  432. oldFid := chunk.FileId
  433. operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid))
  434. }
  435. } else if err != nil {
  436. glog.V(0).Infof("error %v occur when finding %s in filer store", err, path)
  437. }
  438. }
  439. glog.V(4).Infoln("saving", path)
  440. entry := &filer2.Entry{
  441. FullPath: filer2.FullPath(path),
  442. Attr: filer2.Attr{
  443. Mode: 0660,
  444. },
  445. Chunks: fileChunks,
  446. }
  447. if db_err := fs.filer.CreateEntry(entry); db_err != nil {
  448. replyerr = db_err
  449. filerResult.Error = db_err.Error()
  450. glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
  451. return
  452. }
  453. return
  454. }
  455. func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, chunkBuf []byte, fileName string, contentType string, fileId string) (err error) {
  456. err = nil
  457. ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf))
  458. uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, nil, fs.jwt(fileId))
  459. if uploadResult != nil {
  460. glog.V(0).Infoln("Chunk upload result. Name:", uploadResult.Name, "Fid:", fileId, "Size:", uploadResult.Size)
  461. }
  462. if uploadError != nil {
  463. err = uploadError
  464. }
  465. return
  466. }
  467. // curl -X DELETE http://localhost:8888/path/to
  468. func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
  469. entry, err := fs.filer.DeleteEntry(filer2.FullPath(r.URL.Path))
  470. if err != nil {
  471. glog.V(4).Infoln("deleting", r.URL.Path, ":", err.Error())
  472. writeJsonError(w, r, http.StatusInternalServerError, err)
  473. return
  474. }
  475. if entry != nil && !entry.IsDirectory() {
  476. for _, chunk := range entry.Chunks {
  477. oldFid := chunk.FileId
  478. operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid))
  479. }
  480. }
  481. writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""})
  482. }