You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

175 lines
4.7 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "crypto/md5"
  4. "hash"
  5. "io"
  6. "io/ioutil"
  7. "net/http"
  8. "runtime"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/chrislusf/seaweedfs/weed/filer"
  13. "github.com/chrislusf/seaweedfs/weed/glog"
  14. "github.com/chrislusf/seaweedfs/weed/operation"
  15. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  16. "github.com/chrislusf/seaweedfs/weed/security"
  17. "github.com/chrislusf/seaweedfs/weed/stats"
  18. "github.com/chrislusf/seaweedfs/weed/util"
  19. )
  20. var (
  21. limitedUploadProcessor = util.NewLimitedOutOfOrderProcessor(int32(runtime.NumCPU()))
  22. )
  23. func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, dataSize int64, err error, smallContent []byte) {
  24. md5Hash = md5.New()
  25. var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
  26. // save small content directly
  27. if !isAppend(r) && ((0 < contentLength && contentLength < fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && contentLength < 4*1024) {
  28. smallContent, err = ioutil.ReadAll(partReader)
  29. dataSize = int64(len(smallContent))
  30. return
  31. }
  32. resultsChan := make(chan *ChunkCreationResult, 2)
  33. var waitForAllData sync.WaitGroup
  34. waitForAllData.Add(1)
  35. go func() {
  36. // process upload results
  37. defer waitForAllData.Done()
  38. for result := range resultsChan {
  39. if result.err != nil {
  40. err = result.err
  41. continue
  42. }
  43. // Save to chunk manifest structure
  44. fileChunks = append(fileChunks, result.chunk)
  45. }
  46. }()
  47. var lock sync.Mutex
  48. readOffset := int64(0)
  49. var wg sync.WaitGroup
  50. for err == nil {
  51. wg.Add(1)
  52. request := func() {
  53. defer wg.Done()
  54. var localOffset int64
  55. // read from the input
  56. lock.Lock()
  57. localOffset = readOffset
  58. limitedReader := io.LimitReader(partReader, int64(chunkSize))
  59. data, readErr := ioutil.ReadAll(limitedReader)
  60. readOffset += int64(len(data))
  61. lock.Unlock()
  62. // handle read errors
  63. if readErr != nil {
  64. if err == nil {
  65. err = readErr
  66. }
  67. if readErr != io.EOF {
  68. resultsChan <- &ChunkCreationResult{
  69. err: readErr,
  70. }
  71. }
  72. return
  73. }
  74. if len(data) == 0 {
  75. readErr = io.EOF
  76. if err == nil {
  77. err = readErr
  78. }
  79. return
  80. }
  81. // upload
  82. dataReader := util.NewBytesReader(data)
  83. fileId, uploadResult, uploadErr := fs.doCreateChunk(w, r, so, dataReader, fileName, contentType)
  84. if uploadErr != nil {
  85. if err == nil {
  86. err = uploadErr
  87. }
  88. resultsChan <- &ChunkCreationResult{
  89. err: uploadErr,
  90. }
  91. return
  92. }
  93. glog.V(4).Infof("uploaded %s to %s [%d,%d)", fileName, fileId, localOffset, localOffset+int64(uploadResult.Size))
  94. // send back uploaded file chunk
  95. resultsChan <- &ChunkCreationResult{
  96. chunk: uploadResult.ToPbFileChunk(fileId, localOffset),
  97. }
  98. }
  99. limitedUploadProcessor.Execute(request)
  100. }
  101. go func() {
  102. wg.Wait()
  103. close(resultsChan)
  104. }()
  105. waitForAllData.Wait()
  106. if err == io.EOF {
  107. err = nil
  108. }
  109. return fileChunks, md5Hash, readOffset, err, nil
  110. }
  111. type ChunkCreationResult struct {
  112. chunk *filer_pb.FileChunk
  113. err error
  114. }
  115. func (fs *FilerServer) doCreateChunk(w http.ResponseWriter, r *http.Request, so *operation.StorageOption, dataReader *util.BytesReader, fileName string, contentType string) (string, *operation.UploadResult, error) {
  116. // retry to assign a different file id
  117. var fileId, urlLocation string
  118. var auth security.EncodedJwt
  119. var assignErr, uploadErr error
  120. var uploadResult *operation.UploadResult
  121. for i := 0; i < 3; i++ {
  122. // assign one file id for one chunk
  123. fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so)
  124. if assignErr != nil {
  125. return "", nil, assignErr
  126. }
  127. // upload the chunk to the volume server
  128. uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth)
  129. if uploadErr != nil {
  130. time.Sleep(251 * time.Millisecond)
  131. continue
  132. }
  133. break
  134. }
  135. return fileId, uploadResult, uploadErr
  136. }
  137. func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
  138. stats.FilerRequestCounter.WithLabelValues("chunkUpload").Inc()
  139. start := time.Now()
  140. defer func() {
  141. stats.FilerRequestHistogram.WithLabelValues("chunkUpload").Observe(time.Since(start).Seconds())
  142. }()
  143. uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
  144. if uploadResult != nil && uploadResult.RetryCount > 0 {
  145. stats.FilerRequestCounter.WithLabelValues("chunkUploadRetry").Add(float64(uploadResult.RetryCount))
  146. }
  147. return uploadResult, err, data
  148. }