You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

168 lines
4.7 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "crypto/md5"
  4. "hash"
  5. "io"
  6. "io/ioutil"
  7. "net/http"
  8. "runtime"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/chrislusf/seaweedfs/weed/filer"
  13. "github.com/chrislusf/seaweedfs/weed/glog"
  14. "github.com/chrislusf/seaweedfs/weed/operation"
  15. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  16. "github.com/chrislusf/seaweedfs/weed/security"
  17. "github.com/chrislusf/seaweedfs/weed/stats"
  18. "github.com/chrislusf/seaweedfs/weed/util"
  19. )
  20. var (
  21. limitedUploadProcessor = util.NewLimitedOutOfOrderProcessor(int32(runtime.NumCPU()))
  22. )
  23. func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, dataSize int64, err error, smallContent []byte) {
  24. md5Hash = md5.New()
  25. var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
  26. // save small content directly
  27. if !isAppend(r) && ((0 < contentLength && contentLength < fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && contentLength < 4*1024) {
  28. smallContent, err = ioutil.ReadAll(partReader)
  29. dataSize = int64(len(smallContent))
  30. return
  31. }
  32. resultsChan := make(chan *ChunkCreationResult, 2)
  33. var waitForAllData sync.WaitGroup
  34. waitForAllData.Add(1)
  35. go func() {
  36. // process upload results
  37. defer waitForAllData.Done()
  38. for result := range resultsChan {
  39. if result.err != nil {
  40. err = result.err
  41. continue
  42. }
  43. // Save to chunk manifest structure
  44. fileChunks = append(fileChunks, result.chunk)
  45. }
  46. }()
  47. var lock sync.Mutex
  48. readOffset := int64(0)
  49. var wg sync.WaitGroup
  50. for err == nil {
  51. wg.Add(1)
  52. request := func() {
  53. defer wg.Done()
  54. var localOffset int64
  55. // read from the input
  56. lock.Lock()
  57. localOffset = readOffset
  58. limitedReader := io.LimitReader(partReader, int64(chunkSize))
  59. data, readErr := ioutil.ReadAll(limitedReader)
  60. readOffset += int64(len(data))
  61. lock.Unlock()
  62. // handle read errors
  63. if readErr != nil {
  64. if readErr != io.EOF {
  65. if err == nil {
  66. err = readErr
  67. }
  68. resultsChan <- &ChunkCreationResult{
  69. err: readErr,
  70. }
  71. }
  72. return
  73. }
  74. if len(data) == 0 {
  75. readErr = io.EOF
  76. return
  77. }
  78. // upload
  79. dataReader := util.NewBytesReader(data)
  80. fileId, uploadResult, uploadErr := fs.doCreateChunk(w, r, so, dataReader, fileName, contentType)
  81. if uploadErr != nil {
  82. if err == nil {
  83. err = uploadErr
  84. }
  85. resultsChan <- &ChunkCreationResult{
  86. err: uploadErr,
  87. }
  88. return
  89. }
  90. glog.V(4).Infof("uploaded %s to %s [%d,%d)", fileName, fileId, localOffset, localOffset+int64(uploadResult.Size))
  91. // send back uploaded file chunk
  92. resultsChan <- &ChunkCreationResult{
  93. chunk: uploadResult.ToPbFileChunk(fileId, localOffset),
  94. }
  95. }
  96. limitedUploadProcessor.Execute(request)
  97. }
  98. go func() {
  99. wg.Wait()
  100. close(resultsChan)
  101. }()
  102. waitForAllData.Wait()
  103. return fileChunks, md5Hash, readOffset, err, nil
  104. }
  105. type ChunkCreationResult struct {
  106. chunk *filer_pb.FileChunk
  107. err error
  108. }
  109. func (fs *FilerServer) doCreateChunk(w http.ResponseWriter, r *http.Request, so *operation.StorageOption, dataReader *util.BytesReader, fileName string, contentType string) (string, *operation.UploadResult, error) {
  110. // retry to assign a different file id
  111. var fileId, urlLocation string
  112. var auth security.EncodedJwt
  113. var assignErr, uploadErr error
  114. var uploadResult *operation.UploadResult
  115. for i := 0; i < 3; i++ {
  116. // assign one file id for one chunk
  117. fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so)
  118. if assignErr != nil {
  119. return "", nil, assignErr
  120. }
  121. // upload the chunk to the volume server
  122. uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth)
  123. if uploadErr != nil {
  124. time.Sleep(251 * time.Millisecond)
  125. continue
  126. }
  127. break
  128. }
  129. return fileId, uploadResult, uploadErr
  130. }
  131. func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
  132. stats.FilerRequestCounter.WithLabelValues("chunkUpload").Inc()
  133. start := time.Now()
  134. defer func() {
  135. stats.FilerRequestHistogram.WithLabelValues("chunkUpload").Observe(time.Since(start).Seconds())
  136. }()
  137. uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
  138. if uploadResult != nil && uploadResult.RetryCount > 0 {
  139. stats.FilerRequestCounter.WithLabelValues("chunkUploadRetry").Add(float64(uploadResult.RetryCount))
  140. }
  141. return uploadResult, err, data
  142. }