@ -3,6 +3,7 @@ package weed_server
import (
"context"
"crypto/md5"
"hash"
"io"
"io/ioutil"
"net/http"
@ -22,10 +23,6 @@ import (
func ( fs * FilerServer ) autoChunk ( ctx context . Context , w http . ResponseWriter , r * http . Request ,
replication string , collection string , dataCenter string , ttlSec int32 , ttlString string , fsync bool ) bool {
if r . Method != "POST" {
glog . V ( 4 ) . Infoln ( "AutoChunking not supported for method" , r . Method )
return false
}
// autoChunking can be set at the command-line level or as a query param. Query param overrides command-line
query := r . URL . Query ( )
@ -57,7 +54,19 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
return false
}
reply , err := fs . doPostAutoChunk ( ctx , w , r , contentLength , chunkSize , replication , collection , dataCenter , ttlSec , ttlString , fsync )
stats . FilerRequestCounter . WithLabelValues ( "postAutoChunk" ) . Inc ( )
start := time . Now ( )
defer func ( ) {
stats . FilerRequestHistogram . WithLabelValues ( "postAutoChunk" ) . Observe ( time . Since ( start ) . Seconds ( ) )
} ( )
var reply * FilerPostResult
var err error
if r . Method == "POST" {
reply , err = fs . doPostAutoChunk ( ctx , w , r , contentLength , chunkSize , replication , collection , dataCenter , ttlSec , ttlString , fsync )
} else {
reply , err = fs . doPutAutoChunk ( ctx , w , r , contentLength , chunkSize , replication , collection , dataCenter , ttlSec , ttlString , fsync )
}
if err != nil {
writeJsonError ( w , r , http . StatusInternalServerError , err )
} else if reply != nil {
@ -69,12 +78,6 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
func ( fs * FilerServer ) doPostAutoChunk ( ctx context . Context , w http . ResponseWriter , r * http . Request ,
contentLength int64 , chunkSize int32 , replication string , collection string , dataCenter string , ttlSec int32 , ttlString string , fsync bool ) ( filerResult * FilerPostResult , replyerr error ) {
stats . FilerRequestCounter . WithLabelValues ( "postAutoChunk" ) . Inc ( )
start := time . Now ( )
defer func ( ) {
stats . FilerRequestHistogram . WithLabelValues ( "postAutoChunk" ) . Observe ( time . Since ( start ) . Seconds ( ) )
} ( )
multipartReader , multipartReaderErr := r . MultipartReader ( )
if multipartReaderErr != nil {
return nil , multipartReaderErr
@ -90,46 +93,36 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
fileName = path . Base ( fileName )
}
contentType := part1 . Header . Get ( "Content-Type" )
if contentType == "application/octet-stream" {
contentType = ""
}
var fileChunks [ ] * filer_pb . FileChunk
md5Hash := md5 . New ( )
var partReader = ioutil . NopCloser ( io . TeeReader ( part1 , md5Hash ) )
chunkOffset := int64 ( 0 )
for chunkOffset < contentLength {
limitedReader := io . LimitReader ( partReader , int64 ( chunkSize ) )
fileChunks , md5Hash , chunkOffset , err := fs . uploadReaderToChunks ( w , r , part1 , contentLength , chunkSize , replication , collection , dataCenter , ttlString , fileName , contentType , fsync )
if err != nil {
return nil , err
}
// assign one file id for one chunk
fileId , urlLocation , auth , assignErr := fs . assignNewFileInfo ( replication , collection , dataCenter , ttlString , fsync )
if assignErr != nil {
return nil , assignErr
}
fileChunks , replyerr = filer2 . MaybeManifestize ( fs . saveAsChunk ( replication , collection , dataCenter , ttlString , fsync ) , fileChunks )
if replyerr != nil {
glog . V ( 0 ) . Infof ( "manifestize %s: %v" , r . RequestURI , replyerr )
return
}
// upload the chunk to the volume server
uploadResult , uploadErr := fs . doUpload ( urlLocation , w , r , limitedReader , fileName , contentType , nil , auth )
if uploadErr != nil {
return nil , uploadErr
}
filerResult , replyerr = fs . saveMetaData ( ctx , r , fileName , replication , collection , ttlSec , contentType , md5Hash , fileChunks , chunkOffset )
// if last chunk exhausted the reader exactly at the border
if uploadResult . Size == 0 {
break
}
return
}
// Save to chunk manifest structure
fileChunks = append ( fileChunks , uploadResult . ToPbFileChunk ( fileId , chunkOffset ) )
glog . V ( 4 ) . Infof ( "uploaded %s chunk %d to %s [%d,%d) of %d" , fileName , len ( fileChunks ) , fileId , chunkOffset , chunkOffset + int64 ( uploadResult . Size ) , contentLength )
func ( fs * FilerServer ) doPutAutoChunk ( ctx context . Context , w http . ResponseWriter , r * http . Request ,
contentLength int64 , chunkSize int32 , replication string , collection string , dataCenter string , ttlSec int32 , ttlString string , fsync bool ) ( filerResult * FilerPostResult , replyerr error ) {
// reset variables for the next chunk
chunkOffset = chunkOffset + int64 ( uploadResult . Size )
fileName := ""
contentType := ""
// if last chunk was not at full chunk size, but already exhausted the reader
if int64 ( uploadResult . Size ) < int64 ( chunkSize ) {
break
}
fileChunks , md5Hash , chunkOffset , err := fs . uploadReaderToChunks ( w , r , r . Body , contentLength , chunkSize , replication , collection , dataCenter , ttlString , fileName , contentType , fsync )
if err != nil {
return nil , err
}
fileChunks , replyerr = filer2 . MaybeManifestize ( fs . saveAsChunk ( replication , collection , dataCenter , ttlString , fsync ) , fileChunks )
@ -138,6 +131,12 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
return
}
filerResult , replyerr = fs . saveMetaData ( ctx , r , fileName , replication , collection , ttlSec , contentType , md5Hash , fileChunks , chunkOffset )
return
}
func ( fs * FilerServer ) saveMetaData ( ctx context . Context , r * http . Request , fileName string , replication string , collection string , ttlSec int32 , contentType string , md5Hash hash . Hash , fileChunks [ ] * filer_pb . FileChunk , chunkOffset int64 ) ( filerResult * FilerPostResult , replyerr error ) {
path := r . URL . Path
if strings . HasSuffix ( path , "/" ) {
if fileName != "" {
@ -173,10 +172,52 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
replyerr = dbErr
filerResult . Error = dbErr . Error ( )
glog . V ( 0 ) . Infof ( "failing to write %s to filer server : %v" , path , dbErr )
return
}
return filerResult , replyerr
}
return
func ( fs * FilerServer ) uploadReaderToChunks ( w http . ResponseWriter , r * http . Request , reader io . Reader , contentLength int64 , chunkSize int32 , replication string , collection string , dataCenter string , ttlString string , fileName string , contentType string , fsync bool ) ( [ ] * filer_pb . FileChunk , hash . Hash , int64 , error ) {
var fileChunks [ ] * filer_pb . FileChunk
md5Hash := md5 . New ( )
var partReader = ioutil . NopCloser ( io . TeeReader ( reader , md5Hash ) )
chunkOffset := int64 ( 0 )
for chunkOffset < contentLength {
limitedReader := io . LimitReader ( partReader , int64 ( chunkSize ) )
// assign one file id for one chunk
fileId , urlLocation , auth , assignErr := fs . assignNewFileInfo ( replication , collection , dataCenter , ttlString , fsync )
if assignErr != nil {
return nil , nil , 0 , assignErr
}
// upload the chunk to the volume server
uploadResult , uploadErr := fs . doUpload ( urlLocation , w , r , limitedReader , fileName , contentType , nil , auth )
if uploadErr != nil {
return nil , nil , 0 , uploadErr
}
// if last chunk exhausted the reader exactly at the border
if uploadResult . Size == 0 {
break
}
// Save to chunk manifest structure
fileChunks = append ( fileChunks , uploadResult . ToPbFileChunk ( fileId , chunkOffset ) )
glog . V ( 4 ) . Infof ( "uploaded %s chunk %d to %s [%d,%d) of %d" , fileName , len ( fileChunks ) , fileId , chunkOffset , chunkOffset + int64 ( uploadResult . Size ) , contentLength )
// reset variables for the next chunk
chunkOffset = chunkOffset + int64 ( uploadResult . Size )
// if last chunk was not at full chunk size, but already exhausted the reader
if int64 ( uploadResult . Size ) < int64 ( chunkSize ) {
break
}
}
return fileChunks , md5Hash , chunkOffset , nil
}
func ( fs * FilerServer ) doUpload ( urlLocation string , w http . ResponseWriter , r * http . Request , limitedReader io . Reader , fileName string , contentType string , pairMap map [ string ] string , auth security . EncodedJwt ) ( * operation . UploadResult , error ) {