You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

163 lines
4.6 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "crypto/md5"
  4. "hash"
  5. "io"
  6. "io/ioutil"
  7. "net/http"
  8. "runtime"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/chrislusf/seaweedfs/weed/filer"
  13. "github.com/chrislusf/seaweedfs/weed/glog"
  14. "github.com/chrislusf/seaweedfs/weed/operation"
  15. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  16. "github.com/chrislusf/seaweedfs/weed/security"
  17. "github.com/chrislusf/seaweedfs/weed/stats"
  18. "github.com/chrislusf/seaweedfs/weed/util"
  19. )
  20. var (
  21. limitedUploadProcessor = util.NewLimitedOutOfOrderProcessor(int32(runtime.NumCPU()))
  22. )
  23. func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, dataSize int64, err error, smallContent []byte) {
  24. md5Hash = md5.New()
  25. var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
  26. // save small content directly
  27. if !isAppend(r) && ((0 < contentLength && contentLength < fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && contentLength < 4*1024) {
  28. smallContent, err = ioutil.ReadAll(partReader)
  29. dataSize = int64(len(smallContent))
  30. return
  31. }
  32. resultsChan := make(chan *ChunkCreationResult, 2)
  33. var waitForAllData sync.WaitGroup
  34. waitForAllData.Add(1)
  35. go func() {
  36. // process upload results
  37. defer waitForAllData.Done()
  38. for result := range resultsChan {
  39. if result.err != nil {
  40. err = result.err
  41. continue
  42. }
  43. // Save to chunk manifest structure
  44. fileChunks = append(fileChunks, result.chunk)
  45. }
  46. }()
  47. var lock sync.Mutex
  48. readOffset := int64(0)
  49. var wg sync.WaitGroup
  50. var readErr error
  51. for readErr == nil {
  52. wg.Add(1)
  53. limitedUploadProcessor.Execute(func() {
  54. defer wg.Done()
  55. var localOffset int64
  56. var data []byte
  57. // read from the input
  58. lock.Lock()
  59. localOffset = readOffset
  60. limitedReader := io.LimitReader(partReader, int64(chunkSize))
  61. data, readErr = ioutil.ReadAll(limitedReader)
  62. readOffset += int64(len(data))
  63. lock.Unlock()
  64. // handle read errors
  65. if readErr != nil {
  66. if readErr != io.EOF {
  67. resultsChan <- &ChunkCreationResult{
  68. err: readErr,
  69. }
  70. }
  71. return
  72. }
  73. if len(data) == 0 {
  74. readErr = io.EOF
  75. return
  76. }
  77. // upload
  78. dataReader := util.NewBytesReader(data)
  79. fileId, uploadResult, uploadErr := fs.doCreateChunk(w, r, so, dataReader, fileName, contentType)
  80. if uploadErr != nil {
  81. resultsChan <- &ChunkCreationResult{
  82. err: uploadErr,
  83. }
  84. return
  85. }
  86. glog.V(4).Infof("uploaded %s to %s [%d,%d)", fileName, fileId, localOffset, localOffset+int64(uploadResult.Size))
  87. // send back uploaded file chunk
  88. resultsChan <- &ChunkCreationResult{
  89. chunk: uploadResult.ToPbFileChunk(fileId, localOffset),
  90. }
  91. })
  92. }
  93. go func() {
  94. wg.Wait()
  95. close(resultsChan)
  96. }()
  97. waitForAllData.Wait()
  98. return fileChunks, md5Hash, readOffset, err, nil
  99. }
  100. type ChunkCreationResult struct {
  101. chunk *filer_pb.FileChunk
  102. err error
  103. }
  104. func (fs *FilerServer) doCreateChunk(w http.ResponseWriter, r *http.Request, so *operation.StorageOption, dataReader *util.BytesReader, fileName string, contentType string) (string, *operation.UploadResult, error) {
  105. // retry to assign a different file id
  106. var fileId, urlLocation string
  107. var auth security.EncodedJwt
  108. var assignErr, uploadErr error
  109. var uploadResult *operation.UploadResult
  110. for i := 0; i < 3; i++ {
  111. // assign one file id for one chunk
  112. fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so)
  113. if assignErr != nil {
  114. return "", nil, assignErr
  115. }
  116. // upload the chunk to the volume server
  117. uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth)
  118. if uploadErr != nil {
  119. time.Sleep(251 * time.Millisecond)
  120. continue
  121. }
  122. break
  123. }
  124. return fileId, uploadResult, uploadErr
  125. }
  126. func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
  127. stats.FilerRequestCounter.WithLabelValues("chunkUpload").Inc()
  128. start := time.Now()
  129. defer func() {
  130. stats.FilerRequestHistogram.WithLabelValues("chunkUpload").Observe(time.Since(start).Seconds())
  131. }()
  132. uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
  133. if uploadResult != nil && uploadResult.RetryCount > 0 {
  134. stats.FilerRequestCounter.WithLabelValues("chunkUploadRetry").Add(float64(uploadResult.RetryCount))
  135. }
  136. return uploadResult, err, data
  137. }