You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

113 lines
3.7 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "bytes"
  4. "crypto/md5"
  5. "hash"
  6. "io"
  7. "io/ioutil"
  8. "net/http"
  9. "strings"
  10. "time"
  11. "github.com/chrislusf/seaweedfs/weed/filer"
  12. "github.com/chrislusf/seaweedfs/weed/glog"
  13. "github.com/chrislusf/seaweedfs/weed/operation"
  14. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  15. "github.com/chrislusf/seaweedfs/weed/security"
  16. "github.com/chrislusf/seaweedfs/weed/stats"
  17. "github.com/chrislusf/seaweedfs/weed/util"
  18. )
  19. func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error, []byte) {
  20. var fileChunks []*filer_pb.FileChunk
  21. md5Hash := md5.New()
  22. var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
  23. chunkOffset := int64(0)
  24. var smallContent []byte
  25. for {
  26. limitedReader := io.LimitReader(partReader, int64(chunkSize))
  27. data, err := ioutil.ReadAll(limitedReader)
  28. if err != nil {
  29. return nil, nil, 0, err, nil
  30. }
  31. if chunkOffset == 0 && !isAppend(r) {
  32. if len(data) < int(fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 {
  33. smallContent = data
  34. chunkOffset += int64(len(data))
  35. break
  36. }
  37. }
  38. dataReader := util.NewBytesReader(data)
  39. // retry to assign a different file id
  40. var fileId, urlLocation string
  41. var auth security.EncodedJwt
  42. var assignErr, uploadErr error
  43. var uploadResult *operation.UploadResult
  44. for i := 0; i < 3; i++ {
  45. // assign one file id for one chunk
  46. fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so)
  47. if assignErr != nil {
  48. return nil, nil, 0, assignErr, nil
  49. }
  50. // upload the chunk to the volume server
  51. uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth)
  52. if uploadErr != nil {
  53. time.Sleep(251 * time.Millisecond)
  54. continue
  55. }
  56. break
  57. }
  58. if uploadErr != nil {
  59. return nil, nil, 0, uploadErr, nil
  60. }
  61. // if last chunk exhausted the reader exactly at the border
  62. if uploadResult.Size == 0 {
  63. break
  64. }
  65. if chunkOffset == 0 {
  66. uploadedMd5 := util.Base64Md5ToBytes(uploadResult.ContentMd5)
  67. readedMd5 := md5Hash.Sum(nil)
  68. if !bytes.Equal(uploadedMd5, readedMd5) {
  69. glog.Errorf("md5 %x does not match %x uploaded chunk %s to the volume server", readedMd5, uploadedMd5, uploadResult.Name)
  70. }
  71. }
  72. // Save to chunk manifest structure
  73. fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset))
  74. glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size))
  75. // reset variables for the next chunk
  76. chunkOffset = chunkOffset + int64(uploadResult.Size)
  77. // if last chunk was not at full chunk size, but already exhausted the reader
  78. if int64(uploadResult.Size) < int64(chunkSize) {
  79. break
  80. }
  81. }
  82. return fileChunks, md5Hash, chunkOffset, nil, smallContent
  83. }
  84. func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
  85. stats.FilerRequestCounter.WithLabelValues("chunkUpload").Inc()
  86. start := time.Now()
  87. defer func() {
  88. stats.FilerRequestHistogram.WithLabelValues("chunkUpload").Observe(time.Since(start).Seconds())
  89. }()
  90. uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
  91. if uploadResult != nil && uploadResult.RetryCount > 0 {
  92. stats.FilerRequestCounter.WithLabelValues("chunkUploadRetry").Add(float64(uploadResult.RetryCount))
  93. }
  94. return uploadResult, err, data
  95. }