@ -20,6 +20,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/syndtr/goleveldb/leveldb"
"path"
"strconv"
)
type FilerPostResult struct {
@ -217,6 +219,7 @@ func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.R
}
func ( fs * FilerServer ) PostHandler ( w http . ResponseWriter , r * http . Request ) {
query := r . URL . Query ( )
replication := query . Get ( "replication" )
if replication == "" {
@ -227,6 +230,10 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
collection = fs . collection
}
if autoChunked := fs . autoChunk ( w , r , replication , collection ) ; autoChunked {
return
}
var fileId , urlLocation string
var err error
@ -243,7 +250,17 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
u , _ := url . Parse ( urlLocation )
// This allows a client to generate a chunk manifest and submit it to the filer -- it is a little off
// because they need to provide FIDs instead of file paths...
cm , _ := strconv . ParseBool ( query . Get ( "cm" ) )
if cm {
q := u . Query ( )
q . Set ( "cm" , "true" )
u . RawQuery = q . Encode ( )
}
glog . V ( 4 ) . Infoln ( "post to" , u )
request := & http . Request {
Method : r . Method ,
URL : u ,
@ -319,6 +336,197 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
writeJsonQuiet ( w , r , http . StatusCreated , reply )
}
func ( fs * FilerServer ) autoChunk ( w http . ResponseWriter , r * http . Request , replication string , collection string ) bool {
if r . Method != "POST" {
glog . V ( 4 ) . Infoln ( "AutoChunking not supported for method" , r . Method )
return false
}
// autoChunking can be set at the command-line level or as a query param. Query param overrides command-line
query := r . URL . Query ( )
parsedMaxMB , _ := strconv . ParseInt ( query . Get ( "maxMB" ) , 10 , 32 )
maxMB := int32 ( parsedMaxMB )
if maxMB <= 0 && fs . maxMB > 0 {
maxMB = int32 ( fs . maxMB )
}
if maxMB <= 0 {
glog . V ( 4 ) . Infoln ( "AutoChunking not enabled" )
return false
}
glog . V ( 4 ) . Infoln ( "AutoChunking level set to" , maxMB , "(MB)" )
chunkSize := 1024 * 1024 * maxMB
contentLength := int64 ( 0 )
if contentLengthHeader := r . Header [ "Content-Length" ] ; len ( contentLengthHeader ) == 1 {
contentLength , _ = strconv . ParseInt ( contentLengthHeader [ 0 ] , 10 , 64 )
if contentLength <= int64 ( chunkSize ) {
glog . V ( 4 ) . Infoln ( "Content-Length of" , contentLength , "is less than the chunk size of" , chunkSize , "so autoChunking will be skipped." )
return false
}
}
if contentLength <= 0 {
glog . V ( 4 ) . Infoln ( "Content-Length value is missing or unexpected so autoChunking will be skipped." )
return false
}
reply , err := fs . doAutoChunk ( w , r , contentLength , chunkSize , replication , collection )
if err != nil {
writeJsonError ( w , r , http . StatusInternalServerError , err )
} else if reply != nil {
writeJsonQuiet ( w , r , http . StatusCreated , reply )
}
return true
}
func ( fs * FilerServer ) doAutoChunk ( w http . ResponseWriter , r * http . Request , contentLength int64 , chunkSize int32 , replication string , collection string ) ( filerResult * FilerPostResult , replyerr error ) {
multipartReader , multipartReaderErr := r . MultipartReader ( )
if multipartReaderErr != nil {
return nil , multipartReaderErr
}
part1 , part1Err := multipartReader . NextPart ( )
if part1Err != nil {
return nil , part1Err
}
fileName := part1 . FileName ( )
if fileName != "" {
fileName = path . Base ( fileName )
}
chunks := ( int64 ( contentLength ) / int64 ( chunkSize ) ) + 1
cm := operation . ChunkManifest {
Name : fileName ,
Size : 0 , // don't know yet
Mime : "application/octet-stream" ,
Chunks : make ( [ ] * operation . ChunkInfo , 0 , chunks ) ,
}
totalBytesRead := int64 ( 0 )
tmpBufferSize := int32 ( 1024 * 1024 )
tmpBuffer := bytes . NewBuffer ( make ( [ ] byte , 0 , tmpBufferSize ) )
chunkBuf := make ( [ ] byte , chunkSize + tmpBufferSize , chunkSize + tmpBufferSize ) // chunk size plus a little overflow
chunkBufOffset := int32 ( 0 )
chunkOffset := int64 ( 0 )
writtenChunks := 0
filerResult = & FilerPostResult {
Name : fileName ,
}
for totalBytesRead < contentLength {
tmpBuffer . Reset ( )
bytesRead , readErr := io . CopyN ( tmpBuffer , part1 , int64 ( tmpBufferSize ) )
readFully := readErr != nil && readErr == io . EOF
tmpBuf := tmpBuffer . Bytes ( )
bytesToCopy := tmpBuf [ 0 : int ( bytesRead ) ]
copy ( chunkBuf [ chunkBufOffset : chunkBufOffset + int32 ( bytesRead ) ] , bytesToCopy )
chunkBufOffset = chunkBufOffset + int32 ( bytesRead )
if chunkBufOffset >= chunkSize || readFully || ( chunkBufOffset > 0 && bytesRead == 0 ) {
writtenChunks = writtenChunks + 1
fileId , urlLocation , assignErr := fs . assignNewFileInfo ( w , r , replication , collection )
if assignErr != nil {
return nil , assignErr
}
// upload the chunk to the volume server
chunkName := fileName + "_chunk_" + strconv . FormatInt ( int64 ( cm . Chunks . Len ( ) + 1 ) , 10 )
uploadErr := fs . doUpload ( urlLocation , w , r , chunkBuf [ 0 : chunkBufOffset ] , chunkName , "application/octet-stream" , fileId )
if uploadErr != nil {
return nil , uploadErr
}
// Save to chunk manifest structure
cm . Chunks = append ( cm . Chunks ,
& operation . ChunkInfo {
Offset : chunkOffset ,
Size : int64 ( chunkBufOffset ) ,
Fid : fileId ,
} ,
)
// reset variables for the next chunk
chunkBufOffset = 0
chunkOffset = totalBytesRead + int64 ( bytesRead )
}
totalBytesRead = totalBytesRead + int64 ( bytesRead )
if bytesRead == 0 || readFully {
break
}
if readErr != nil {
return nil , readErr
}
}
cm . Size = totalBytesRead
manifestBuf , marshalErr := cm . Marshal ( )
if marshalErr != nil {
return nil , marshalErr
}
manifestStr := string ( manifestBuf )
glog . V ( 4 ) . Infoln ( "Generated chunk manifest: " , manifestStr )
manifestFileId , manifestUrlLocation , manifestAssignmentErr := fs . assignNewFileInfo ( w , r , replication , collection )
if manifestAssignmentErr != nil {
return nil , manifestAssignmentErr
}
glog . V ( 4 ) . Infoln ( "Manifest uploaded to:" , manifestUrlLocation , "Fid:" , manifestFileId )
filerResult . Fid = manifestFileId
u , _ := url . Parse ( manifestUrlLocation )
q := u . Query ( )
q . Set ( "cm" , "true" )
u . RawQuery = q . Encode ( )
manifestUploadErr := fs . doUpload ( u . String ( ) , w , r , manifestBuf , fileName + "_manifest" , "application/json" , manifestFileId )
if manifestUploadErr != nil {
return nil , manifestUploadErr
}
path := r . URL . Path
// also delete the old fid unless PUT operation
if r . Method != "PUT" {
if oldFid , err := fs . filer . FindFile ( path ) ; err == nil {
operation . DeleteFile ( fs . getMasterNode ( ) , oldFid , fs . jwt ( oldFid ) )
}
}
glog . V ( 4 ) . Infoln ( "saving" , path , "=>" , manifestFileId )
if db_err := fs . filer . CreateFile ( path , manifestFileId ) ; db_err != nil {
replyerr = db_err
filerResult . Error = db_err . Error ( )
operation . DeleteFile ( fs . getMasterNode ( ) , manifestFileId , fs . jwt ( manifestFileId ) ) //clean up
glog . V ( 0 ) . Infof ( "failing to write %s to filer server : %v" , path , db_err )
return
}
return
}
func ( fs * FilerServer ) doUpload ( urlLocation string , w http . ResponseWriter , r * http . Request , chunkBuf [ ] byte , fileName string , contentType string , fileId string ) ( err error ) {
err = nil
ioReader := ioutil . NopCloser ( bytes . NewBuffer ( chunkBuf ) )
uploadResult , uploadError := operation . Upload ( urlLocation , fileName , ioReader , false , contentType , fs . jwt ( fileId ) )
if uploadResult != nil {
glog . V ( 0 ) . Infoln ( "Chunk upload result. Name:" , uploadResult . Name , "Fid:" , fileId , "Size:" , uploadResult . Size )
}
if uploadError != nil {
err = uploadError
}
return
}
// curl -X DELETE http://localhost:8888/path/to
// curl -X DELETE http://localhost:8888/path/to?recursive=true
func ( fs * FilerServer ) DeleteHandler ( w http . ResponseWriter , r * http . Request ) {