You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

159 lines
4.6 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "crypto/md5"
  4. "hash"
  5. "io"
  6. "io/ioutil"
  7. "math/rand"
  8. "net/http"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/chrislusf/seaweedfs/weed/filer"
  13. "github.com/chrislusf/seaweedfs/weed/glog"
  14. "github.com/chrislusf/seaweedfs/weed/operation"
  15. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  16. "github.com/chrislusf/seaweedfs/weed/security"
  17. "github.com/chrislusf/seaweedfs/weed/stats"
  18. "github.com/chrislusf/seaweedfs/weed/util"
  19. )
  20. func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, dataSize int64, err error, smallContent []byte) {
  21. md5Hash = md5.New()
  22. var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
  23. // save small content directly
  24. if !isAppend(r) && ((0 < contentLength && contentLength < fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && contentLength < 4*1024) {
  25. smallContent, err = ioutil.ReadAll(partReader)
  26. dataSize = int64(len(smallContent))
  27. return
  28. }
  29. resultsChan := make(chan *ChunkCreationResult, operation.ConcurrentUploadLimit)
  30. var waitForAllData sync.WaitGroup
  31. waitForAllData.Add(1)
  32. go func() {
  33. // process upload results
  34. defer waitForAllData.Done()
  35. for result := range resultsChan {
  36. if result.err != nil {
  37. err = result.err
  38. continue
  39. }
  40. // Save to chunk manifest structure
  41. fileChunks = append(fileChunks, result.chunk)
  42. }
  43. }()
  44. var lock sync.Mutex
  45. readOffset := int64(0)
  46. var wg sync.WaitGroup
  47. var readErr error
  48. for readErr == nil {
  49. wg.Add(1)
  50. operation.AsyncOutOfOrderProcess(rand.Uint32(), func() {
  51. defer wg.Done()
  52. var localOffset int64
  53. var data []byte
  54. // read from the input
  55. lock.Lock()
  56. localOffset = readOffset
  57. limitedReader := io.LimitReader(partReader, int64(chunkSize))
  58. data, readErr = ioutil.ReadAll(limitedReader)
  59. readOffset += int64(len(data))
  60. lock.Unlock()
  61. // handle read errors
  62. if readErr != nil {
  63. if readErr != io.EOF {
  64. resultsChan <- &ChunkCreationResult{
  65. err: readErr,
  66. }
  67. }
  68. return
  69. }
  70. if len(data) == 0 {
  71. readErr = io.EOF
  72. return
  73. }
  74. // upload
  75. dataReader := util.NewBytesReader(data)
  76. fileId, uploadResult, uploadErr := fs.doCreateChunk(w, r, so, dataReader, fileName, contentType)
  77. if uploadErr != nil {
  78. resultsChan <- &ChunkCreationResult{
  79. err: uploadErr,
  80. }
  81. return
  82. }
  83. glog.V(4).Infof("uploaded %s to %s [%d,%d)", fileName, fileId, localOffset, localOffset+int64(uploadResult.Size))
  84. // send back uploaded file chunk
  85. resultsChan <- &ChunkCreationResult{
  86. chunk: uploadResult.ToPbFileChunk(fileId, localOffset),
  87. }
  88. })
  89. }
  90. go func() {
  91. wg.Wait()
  92. close(resultsChan)
  93. }()
  94. waitForAllData.Wait()
  95. return fileChunks, md5Hash, readOffset, err, nil
  96. }
  97. type ChunkCreationResult struct {
  98. chunk *filer_pb.FileChunk
  99. err error
  100. }
  101. func (fs *FilerServer) doCreateChunk(w http.ResponseWriter, r *http.Request, so *operation.StorageOption, dataReader *util.BytesReader, fileName string, contentType string) (string, *operation.UploadResult, error) {
  102. // retry to assign a different file id
  103. var fileId, urlLocation string
  104. var auth security.EncodedJwt
  105. var assignErr, uploadErr error
  106. var uploadResult *operation.UploadResult
  107. for i := 0; i < 3; i++ {
  108. // assign one file id for one chunk
  109. fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so)
  110. if assignErr != nil {
  111. return "", nil, assignErr
  112. }
  113. // upload the chunk to the volume server
  114. uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth)
  115. if uploadErr != nil {
  116. time.Sleep(251 * time.Millisecond)
  117. continue
  118. }
  119. break
  120. }
  121. return fileId, uploadResult, uploadErr
  122. }
  123. func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
  124. stats.FilerRequestCounter.WithLabelValues("chunkUpload").Inc()
  125. start := time.Now()
  126. defer func() {
  127. stats.FilerRequestHistogram.WithLabelValues("chunkUpload").Observe(time.Since(start).Seconds())
  128. }()
  129. uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
  130. if uploadResult != nil && uploadResult.RetryCount > 0 {
  131. stats.FilerRequestCounter.WithLabelValues("chunkUploadRetry").Add(float64(uploadResult.RetryCount))
  132. }
  133. return uploadResult, err, data
  134. }