You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

105 lines
3.4 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "crypto/md5"
  4. "hash"
  5. "io"
  6. "io/ioutil"
  7. "net/http"
  8. "strings"
  9. "time"
  10. "github.com/chrislusf/seaweedfs/weed/filer"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/operation"
  13. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  14. "github.com/chrislusf/seaweedfs/weed/security"
  15. "github.com/chrislusf/seaweedfs/weed/stats"
  16. "github.com/chrislusf/seaweedfs/weed/util"
  17. )
  18. func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error, []byte) {
  19. var fileChunks []*filer_pb.FileChunk
  20. md5Hash := md5.New()
  21. var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
  22. chunkOffset := int64(0)
  23. var smallContent []byte
  24. for {
  25. limitedReader := io.LimitReader(partReader, int64(chunkSize))
  26. data, err := ioutil.ReadAll(limitedReader)
  27. if err != nil {
  28. return nil, nil, 0, err, nil
  29. }
  30. if chunkOffset == 0 && !isAppend(r) {
  31. if len(data) < int(fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 {
  32. smallContent = data
  33. chunkOffset += int64(len(data))
  34. break
  35. }
  36. }
  37. dataReader := util.NewBytesReader(data)
  38. // retry to assign a different file id
  39. var fileId, urlLocation string
  40. var auth security.EncodedJwt
  41. var assignErr, uploadErr error
  42. var uploadResult *operation.UploadResult
  43. for i := 0; i < 3; i++ {
  44. // assign one file id for one chunk
  45. fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so)
  46. if assignErr != nil {
  47. return nil, nil, 0, assignErr, nil
  48. }
  49. // upload the chunk to the volume server
  50. uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth)
  51. if uploadErr != nil {
  52. time.Sleep(251 * time.Millisecond)
  53. continue
  54. }
  55. break
  56. }
  57. if uploadErr != nil {
  58. return nil, nil, 0, uploadErr, nil
  59. }
  60. // if last chunk exhausted the reader exactly at the border
  61. if uploadResult.Size == 0 {
  62. break
  63. }
  64. // Save to chunk manifest structure
  65. fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset))
  66. glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size))
  67. // reset variables for the next chunk
  68. chunkOffset = chunkOffset + int64(uploadResult.Size)
  69. // if last chunk was not at full chunk size, but already exhausted the reader
  70. if int64(uploadResult.Size) < int64(chunkSize) {
  71. break
  72. }
  73. }
  74. return fileChunks, md5Hash, chunkOffset, nil, smallContent
  75. }
  76. func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
  77. stats.FilerRequestCounter.WithLabelValues("chunkUpload").Inc()
  78. start := time.Now()
  79. defer func() {
  80. stats.FilerRequestHistogram.WithLabelValues("chunkUpload").Observe(time.Since(start).Seconds())
  81. }()
  82. uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
  83. if uploadResult != nil && uploadResult.RetryCount > 0 {
  84. stats.FilerRequestCounter.WithLabelValues("chunkUploadRetry").Add(float64(uploadResult.RetryCount))
  85. }
  86. return uploadResult, err, data
  87. }