|
|
@ -29,7 +29,6 @@ var bufPool = sync.Pool{ |
|
|
|
} |
|
|
|
|
|
|
|
func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error, []byte) { |
|
|
|
var fileChunks []*filer_pb.FileChunk |
|
|
|
|
|
|
|
md5Hash := md5.New() |
|
|
|
var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash)) |
|
|
@ -41,6 +40,8 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque |
|
|
|
var wg sync.WaitGroup |
|
|
|
var bytesBufferCounter int64 |
|
|
|
bytesBufferLimitCond := sync.NewCond(new(sync.Mutex)) |
|
|
|
var fileChunks []*filer_pb.FileChunk |
|
|
|
var fileChunksLock sync.Mutex |
|
|
|
for { |
|
|
|
|
|
|
|
// need to throttle used byte buffer
|
|
|
@ -94,7 +95,9 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque |
|
|
|
uploadErr = toChunkErr |
|
|
|
} |
|
|
|
if chunk != nil { |
|
|
|
fileChunksLock.Lock() |
|
|
|
fileChunks = append(fileChunks, chunk) |
|
|
|
fileChunksLock.Unlock() |
|
|
|
glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), chunk.FileId, offset, offset+int64(chunk.Size)) |
|
|
|
} |
|
|
|
}(chunkOffset) |
|
|
|