diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index b48d73deb..a1df07d7e 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/valyala/bytebufferpool" "io" "mime" "mime/multipart" @@ -32,6 +33,7 @@ type UploadOption struct { Jwt security.EncodedJwt RetryForever bool Md5 string + BytesBuffer *bytes.Buffer } type UploadResult struct { @@ -261,6 +263,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult PairMap: option.PairMap, Jwt: option.Jwt, Md5: option.Md5, + BytesBuffer: option.BytesBuffer, }) if uploadResult == nil { return @@ -275,9 +278,17 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult } func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) { - buf := GetBuffer() - defer PutBuffer(buf) - body_writer := multipart.NewWriter(buf) + var body_writer *multipart.Writer + var reqReader *bytes.Reader + var buf *bytebufferpool.ByteBuffer + if option.BytesBuffer == nil { + buf = GetBuffer() + defer PutBuffer(buf) + body_writer = multipart.NewWriter(buf) + } else { + option.BytesBuffer.Reset() + body_writer = multipart.NewWriter(option.BytesBuffer) + } h := make(textproto.MIMEHeader) filename := fileNameEscaper.Replace(option.Filename) h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, filename)) @@ -309,8 +320,12 @@ func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize glog.V(0).Infoln("error closing body", err) return nil, err } - - req, postErr := http.NewRequest("POST", option.UploadUrl, bytes.NewReader(buf.Bytes())) + if option.BytesBuffer == nil { + reqReader = bytes.NewReader(buf.Bytes()) + } else { + reqReader = bytes.NewReader(option.BytesBuffer.Bytes()) + } + req, postErr := http.NewRequest("POST", option.UploadUrl, reqReader) if postErr != nil { glog.V(1).Infof("create upload request %s: %v", option.UploadUrl, postErr) return nil, fmt.Errorf("create upload request %s: %v", option.UploadUrl, postErr) diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index eeb031cd1..b9571710d 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -234,7 +234,7 @@ func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry } } - chunks = append(chunks, manifestChunks...) + chunks = append(manifestChunks, chunks...) return } diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 7517d8641..6e151bf80 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -1,7 +1,6 @@ package weed_server import ( - "bytes" "errors" "fmt" "net/http" @@ -13,6 +12,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/topology" + "github.com/seaweedfs/seaweedfs/weed/util/buffer_pool" ) func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { @@ -35,8 +35,8 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { return } - bytesBuffer := bufPool.Get().(*bytes.Buffer) - defer bufPool.Put(bytesBuffer) + bytesBuffer := buffer_pool.SyncPoolGetBuffer() + defer buffer_pool.SyncPoolPutBuffer(bytesBuffer) reqNeedle, originalSize, contentMd5, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes, bytesBuffer) if ne != nil { diff --git a/weed/storage/needle/needle_write.go b/weed/storage/needle/needle_write.go index 6546f35a6..51d3bcf40 100644 --- a/weed/storage/needle/needle_write.go +++ b/weed/storage/needle/needle_write.go @@ -7,16 +7,10 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/backend" . "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/buffer_pool" "math" - "sync" ) -var bufPool = sync.Pool{ - New: func() interface{} { - return new(bytes.Buffer) - }, -} - func (n *Needle) prepareWriteBuffer(version Version, writeBytes *bytes.Buffer) (Size, int64, error) { writeBytes.Reset() switch version { @@ -132,8 +126,8 @@ func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset u return } - bytesBuffer := bufPool.Get().(*bytes.Buffer) - defer bufPool.Put(bytesBuffer) + bytesBuffer := buffer_pool.SyncPoolGetBuffer() + defer buffer_pool.SyncPoolPutBuffer(bytesBuffer) size, actualSize, err = n.prepareWriteBuffer(version, bytesBuffer) diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index a5a406459..82c2db79c 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -19,6 +19,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/buffer_pool" ) func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request, contentMd5 string) (isUnchanged bool, err error) { @@ -87,6 +88,8 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt pairMap[needle.PairNamePrefix+k] = v } } + bytesBuffer := buffer_pool.SyncPoolGetBuffer() + defer buffer_pool.SyncPoolPutBuffer(bytesBuffer) // volume server do not know about encryption // TODO optimize here to compress data only once @@ -99,6 +102,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt PairMap: pairMap, Jwt: jwt, Md5: contentMd5, + BytesBuffer: bytesBuffer, } _, err := operation.UploadData(n.Data, uploadOption) diff --git a/weed/util/buffer_pool/sync_pool.go b/weed/util/buffer_pool/sync_pool.go new file mode 100644 index 000000000..b97274691 --- /dev/null +++ b/weed/util/buffer_pool/sync_pool.go @@ -0,0 +1,20 @@ +package buffer_pool + +import ( + "bytes" + "sync" +) + +var syncPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, +} + +func SyncPoolGetBuffer() *bytes.Buffer { + return syncPool.Get().(*bytes.Buffer) +} + +func SyncPoolPutBuffer(buffer *bytes.Buffer) { + syncPool.Put(buffer) +}