|
|
@ -6,9 +6,7 @@ import ( |
|
|
|
"io" |
|
|
|
"io/ioutil" |
|
|
|
"net/http" |
|
|
|
"runtime" |
|
|
|
"strings" |
|
|
|
"sync" |
|
|
|
"time" |
|
|
|
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/filer" |
|
|
@ -20,143 +18,75 @@ import ( |
|
|
|
"github.com/chrislusf/seaweedfs/weed/util" |
|
|
|
) |
|
|
|
|
|
|
|
var ( |
|
|
|
limitedUploadProcessor = util.NewLimitedOutOfOrderProcessor(int32(runtime.NumCPU())) |
|
|
|
) |
|
|
|
|
|
|
|
func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, dataSize int64, err error, smallContent []byte) { |
|
|
|
func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error, []byte) { |
|
|
|
var fileChunks []*filer_pb.FileChunk |
|
|
|
|
|
|
|
md5Hash = md5.New() |
|
|
|
md5Hash := md5.New() |
|
|
|
var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash)) |
|
|
|
|
|
|
|
// save small content directly
|
|
|
|
if !isAppend(r) && ((0 < contentLength && contentLength < fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && contentLength < 4*1024) { |
|
|
|
smallContent, err = ioutil.ReadAll(partReader) |
|
|
|
dataSize = int64(len(smallContent)) |
|
|
|
return |
|
|
|
} |
|
|
|
chunkOffset := int64(0) |
|
|
|
var smallContent []byte |
|
|
|
|
|
|
|
resultsChan := make(chan *ChunkCreationResult, 2) |
|
|
|
for { |
|
|
|
limitedReader := io.LimitReader(partReader, int64(chunkSize)) |
|
|
|
|
|
|
|
var waitForAllData sync.WaitGroup |
|
|
|
waitForAllData.Add(1) |
|
|
|
go func() { |
|
|
|
// process upload results
|
|
|
|
defer waitForAllData.Done() |
|
|
|
for result := range resultsChan { |
|
|
|
if result.err != nil { |
|
|
|
err = result.err |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
// Save to chunk manifest structure
|
|
|
|
fileChunks = append(fileChunks, result.chunk) |
|
|
|
data, err := ioutil.ReadAll(limitedReader) |
|
|
|
if err != nil { |
|
|
|
return nil, nil, 0, err, nil |
|
|
|
} |
|
|
|
}() |
|
|
|
|
|
|
|
var lock sync.Mutex |
|
|
|
readOffset := int64(0) |
|
|
|
var wg sync.WaitGroup |
|
|
|
|
|
|
|
for err == nil { |
|
|
|
|
|
|
|
wg.Add(1) |
|
|
|
request := func() { |
|
|
|
defer wg.Done() |
|
|
|
|
|
|
|
var localOffset int64 |
|
|
|
// read from the input
|
|
|
|
lock.Lock() |
|
|
|
localOffset = readOffset |
|
|
|
limitedReader := io.LimitReader(partReader, int64(chunkSize)) |
|
|
|
data, readErr := ioutil.ReadAll(limitedReader) |
|
|
|
readOffset += int64(len(data)) |
|
|
|
lock.Unlock() |
|
|
|
// handle read errors
|
|
|
|
if readErr != nil { |
|
|
|
if err == nil { |
|
|
|
err = readErr |
|
|
|
} |
|
|
|
if readErr != io.EOF { |
|
|
|
resultsChan <- &ChunkCreationResult{ |
|
|
|
err: readErr, |
|
|
|
} |
|
|
|
} |
|
|
|
return |
|
|
|
if chunkOffset == 0 && !isAppend(r) { |
|
|
|
if len(data) < int(fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 { |
|
|
|
smallContent = data |
|
|
|
chunkOffset += int64(len(data)) |
|
|
|
break |
|
|
|
} |
|
|
|
if len(data) == 0 { |
|
|
|
readErr = io.EOF |
|
|
|
if err == nil { |
|
|
|
err = readErr |
|
|
|
} |
|
|
|
return |
|
|
|
} |
|
|
|
dataReader := util.NewBytesReader(data) |
|
|
|
|
|
|
|
// retry to assign a different file id
|
|
|
|
var fileId, urlLocation string |
|
|
|
var auth security.EncodedJwt |
|
|
|
var assignErr, uploadErr error |
|
|
|
var uploadResult *operation.UploadResult |
|
|
|
for i := 0; i < 3; i++ { |
|
|
|
// assign one file id for one chunk
|
|
|
|
fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so) |
|
|
|
if assignErr != nil { |
|
|
|
return nil, nil, 0, assignErr, nil |
|
|
|
} |
|
|
|
|
|
|
|
// upload
|
|
|
|
dataReader := util.NewBytesReader(data) |
|
|
|
fileId, uploadResult, uploadErr := fs.doCreateChunk(w, r, so, dataReader, fileName, contentType) |
|
|
|
// upload the chunk to the volume server
|
|
|
|
uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth) |
|
|
|
if uploadErr != nil { |
|
|
|
if err == nil { |
|
|
|
err = uploadErr |
|
|
|
} |
|
|
|
resultsChan <- &ChunkCreationResult{ |
|
|
|
err: uploadErr, |
|
|
|
} |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
glog.V(4).Infof("uploaded %s to %s [%d,%d)", fileName, fileId, localOffset, localOffset+int64(uploadResult.Size)) |
|
|
|
|
|
|
|
// send back uploaded file chunk
|
|
|
|
resultsChan <- &ChunkCreationResult{ |
|
|
|
chunk: uploadResult.ToPbFileChunk(fileId, localOffset), |
|
|
|
time.Sleep(251 * time.Millisecond) |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
break |
|
|
|
} |
|
|
|
if uploadErr != nil { |
|
|
|
return nil, nil, 0, uploadErr, nil |
|
|
|
} |
|
|
|
limitedUploadProcessor.Execute(request) |
|
|
|
} |
|
|
|
|
|
|
|
go func() { |
|
|
|
wg.Wait() |
|
|
|
close(resultsChan) |
|
|
|
}() |
|
|
|
|
|
|
|
waitForAllData.Wait() |
|
|
|
|
|
|
|
if err == io.EOF { |
|
|
|
err = nil |
|
|
|
} |
|
|
|
// if last chunk exhausted the reader exactly at the border
|
|
|
|
if uploadResult.Size == 0 { |
|
|
|
break |
|
|
|
} |
|
|
|
|
|
|
|
return fileChunks, md5Hash, readOffset, err, nil |
|
|
|
} |
|
|
|
// Save to chunk manifest structure
|
|
|
|
fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset)) |
|
|
|
|
|
|
|
type ChunkCreationResult struct { |
|
|
|
chunk *filer_pb.FileChunk |
|
|
|
err error |
|
|
|
} |
|
|
|
glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size)) |
|
|
|
|
|
|
|
func (fs *FilerServer) doCreateChunk(w http.ResponseWriter, r *http.Request, so *operation.StorageOption, dataReader *util.BytesReader, fileName string, contentType string) (string, *operation.UploadResult, error) { |
|
|
|
// retry to assign a different file id
|
|
|
|
var fileId, urlLocation string |
|
|
|
var auth security.EncodedJwt |
|
|
|
var assignErr, uploadErr error |
|
|
|
var uploadResult *operation.UploadResult |
|
|
|
for i := 0; i < 3; i++ { |
|
|
|
// assign one file id for one chunk
|
|
|
|
fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so) |
|
|
|
if assignErr != nil { |
|
|
|
return "", nil, assignErr |
|
|
|
} |
|
|
|
// reset variables for the next chunk
|
|
|
|
chunkOffset = chunkOffset + int64(uploadResult.Size) |
|
|
|
|
|
|
|
// upload the chunk to the volume server
|
|
|
|
uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth) |
|
|
|
if uploadErr != nil { |
|
|
|
time.Sleep(251 * time.Millisecond) |
|
|
|
continue |
|
|
|
// if last chunk was not at full chunk size, but already exhausted the reader
|
|
|
|
if int64(uploadResult.Size) < int64(chunkSize) { |
|
|
|
break |
|
|
|
} |
|
|
|
break |
|
|
|
} |
|
|
|
return fileId, uploadResult, uploadErr |
|
|
|
|
|
|
|
return fileChunks, md5Hash, chunkOffset, nil, smallContent |
|
|
|
} |
|
|
|
|
|
|
|
func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) { |
|
|
|