You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

164 lines
4.6 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "crypto/md5"
  4. "hash"
  5. "io"
  6. "io/ioutil"
  7. "net/http"
  8. "runtime"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/chrislusf/seaweedfs/weed/filer"
  13. "github.com/chrislusf/seaweedfs/weed/glog"
  14. "github.com/chrislusf/seaweedfs/weed/operation"
  15. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  16. "github.com/chrislusf/seaweedfs/weed/security"
  17. "github.com/chrislusf/seaweedfs/weed/stats"
  18. "github.com/chrislusf/seaweedfs/weed/util"
  19. )
  20. var (
  21. limitedUploadProcessor = util.NewLimitedOutOfOrderProcessor(int32(runtime.NumCPU()))
  22. )
  23. func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, dataSize int64, err error, smallContent []byte) {
  24. md5Hash = md5.New()
  25. var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
  26. // save small content directly
  27. if !isAppend(r) && ((0 < contentLength && contentLength < fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && contentLength < 4*1024) {
  28. smallContent, err = ioutil.ReadAll(partReader)
  29. dataSize = int64(len(smallContent))
  30. return
  31. }
  32. resultsChan := make(chan *ChunkCreationResult, 2)
  33. var waitForAllData sync.WaitGroup
  34. waitForAllData.Add(1)
  35. go func() {
  36. // process upload results
  37. defer waitForAllData.Done()
  38. for result := range resultsChan {
  39. if result.err != nil {
  40. err = result.err
  41. continue
  42. }
  43. // Save to chunk manifest structure
  44. fileChunks = append(fileChunks, result.chunk)
  45. }
  46. }()
  47. var lock sync.Mutex
  48. readOffset := int64(0)
  49. var wg sync.WaitGroup
  50. var readErr error
  51. for readErr == nil {
  52. wg.Add(1)
  53. request := func() {
  54. defer wg.Done()
  55. var localOffset int64
  56. var data []byte
  57. // read from the input
  58. lock.Lock()
  59. localOffset = readOffset
  60. limitedReader := io.LimitReader(partReader, int64(chunkSize))
  61. data, readErr = ioutil.ReadAll(limitedReader)
  62. readOffset += int64(len(data))
  63. lock.Unlock()
  64. // handle read errors
  65. if readErr != nil {
  66. if readErr != io.EOF {
  67. resultsChan <- &ChunkCreationResult{
  68. err: readErr,
  69. }
  70. }
  71. return
  72. }
  73. if len(data) == 0 {
  74. readErr = io.EOF
  75. return
  76. }
  77. // upload
  78. dataReader := util.NewBytesReader(data)
  79. fileId, uploadResult, uploadErr := fs.doCreateChunk(w, r, so, dataReader, fileName, contentType)
  80. if uploadErr != nil {
  81. resultsChan <- &ChunkCreationResult{
  82. err: uploadErr,
  83. }
  84. return
  85. }
  86. glog.V(4).Infof("uploaded %s to %s [%d,%d)", fileName, fileId, localOffset, localOffset+int64(uploadResult.Size))
  87. // send back uploaded file chunk
  88. resultsChan <- &ChunkCreationResult{
  89. chunk: uploadResult.ToPbFileChunk(fileId, localOffset),
  90. }
  91. }
  92. limitedUploadProcessor.Execute(request)
  93. }
  94. go func() {
  95. wg.Wait()
  96. close(resultsChan)
  97. }()
  98. waitForAllData.Wait()
  99. return fileChunks, md5Hash, readOffset, err, nil
  100. }
  101. type ChunkCreationResult struct {
  102. chunk *filer_pb.FileChunk
  103. err error
  104. }
  105. func (fs *FilerServer) doCreateChunk(w http.ResponseWriter, r *http.Request, so *operation.StorageOption, dataReader *util.BytesReader, fileName string, contentType string) (string, *operation.UploadResult, error) {
  106. // retry to assign a different file id
  107. var fileId, urlLocation string
  108. var auth security.EncodedJwt
  109. var assignErr, uploadErr error
  110. var uploadResult *operation.UploadResult
  111. for i := 0; i < 3; i++ {
  112. // assign one file id for one chunk
  113. fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so)
  114. if assignErr != nil {
  115. return "", nil, assignErr
  116. }
  117. // upload the chunk to the volume server
  118. uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth)
  119. if uploadErr != nil {
  120. time.Sleep(251 * time.Millisecond)
  121. continue
  122. }
  123. break
  124. }
  125. return fileId, uploadResult, uploadErr
  126. }
  127. func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
  128. stats.FilerRequestCounter.WithLabelValues("chunkUpload").Inc()
  129. start := time.Now()
  130. defer func() {
  131. stats.FilerRequestHistogram.WithLabelValues("chunkUpload").Observe(time.Since(start).Seconds())
  132. }()
  133. uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
  134. if uploadResult != nil && uploadResult.RetryCount > 0 {
  135. stats.FilerRequestCounter.WithLabelValues("chunkUploadRetry").Add(float64(uploadResult.RetryCount))
  136. }
  137. return uploadResult, err, data
  138. }