Browse Source

use bytes.Buffer to reduce memory allocation and gc

pull/2115/head
Chris Lu 4 years ago
parent
commit
6c82326575
  1. 2
      weed/operation/needle_parse_test.go
  2. 5
      weed/server/common.go
  3. 6
      weed/server/filer_server_handlers_write_cipher.go
  4. 24
      weed/server/filer_server_handlers_write_upload.go
  5. 6
      weed/server/volume_server_handlers_write.go
  6. 5
      weed/storage/needle/needle.go
  7. 29
      weed/storage/needle/needle_parse_upload.go
  8. 59
      weed/storage/needle/needle_read_write.go

2
weed/operation/needle_parse_test.go

@ -18,7 +18,7 @@ type MockClient struct {
}
func (m *MockClient) Do(req *http.Request) (*http.Response, error) {
n, originalSize, _, err := needle.CreateNeedleFromRequest(req, false, 1024*1024)
n, originalSize, _, err := needle.CreateNeedleFromRequest(req, false, 1024*1024, &bytes.Buffer{})
if m.needleHandling != nil {
m.needleHandling(n, originalSize, err)
}

5
weed/server/common.go

@ -1,6 +1,7 @@
package weed_server
import (
"bytes"
"encoding/json"
"errors"
"fmt"
@ -104,7 +105,9 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope
}
debug("parsing upload file...")
pu, pe := needle.ParseUpload(r, 256*1024*1024)
bytesBuffer := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(bytesBuffer)
pu, pe := needle.ParseUpload(r, 256*1024*1024, bytesBuffer)
if pe != nil {
writeJsonError(w, r, http.StatusBadRequest, pe)
return

6
weed/server/filer_server_handlers_write_cipher.go

@ -1,6 +1,7 @@
package weed_server
import (
"bytes"
"context"
"fmt"
"net/http"
@ -30,7 +31,10 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
sizeLimit := int64(fs.option.MaxMB) * 1024 * 1024
pu, err := needle.ParseUpload(r, sizeLimit)
bytesBuffer := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(bytesBuffer)
pu, err := needle.ParseUpload(r, sizeLimit, bytesBuffer)
uncompressedData := pu.Data
if pu.IsGzipped {
uncompressedData = pu.UncompressedData

24
weed/server/filer_server_handlers_write_upload.go

@ -8,6 +8,7 @@ import (
"io/ioutil"
"net/http"
"strings"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/filer"
@ -19,6 +20,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error, []byte) {
var fileChunks []*filer_pb.FileChunk
@ -28,21 +35,28 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
chunkOffset := int64(0)
var smallContent []byte
bytesBuffer := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(bytesBuffer)
for {
limitedReader := io.LimitReader(partReader, int64(chunkSize))
data, err := ioutil.ReadAll(limitedReader)
bytesBuffer.Reset()
dataSize, err := bytesBuffer.ReadFrom(limitedReader)
// data, err := ioutil.ReadAll(limitedReader)
if err != nil {
return nil, nil, 0, err, nil
}
if chunkOffset == 0 && !isAppend(r) {
if len(data) < int(fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 {
smallContent = data
chunkOffset += int64(len(data))
if dataSize < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && dataSize < 4*1024 {
chunkOffset += dataSize
smallContent = make([]byte, dataSize)
bytesBuffer.Write(smallContent)
break
}
}
dataReader := util.NewBytesReader(data)
dataReader := util.NewBytesReader(bytesBuffer.Bytes())
// retry to assign a different file id
var fileId, urlLocation string

6
weed/server/volume_server_handlers_write.go

@ -1,6 +1,7 @@
package weed_server
import (
"bytes"
"errors"
"fmt"
"net/http"
@ -42,7 +43,10 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
return
}
reqNeedle, originalSize, contentMd5, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes)
bytesBuffer := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(bytesBuffer)
reqNeedle, originalSize, contentMd5, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes, bytesBuffer)
if ne != nil {
writeJsonError(w, r, http.StatusBadRequest, ne)
return

5
weed/storage/needle/needle.go

@ -1,6 +1,7 @@
package needle
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
@ -48,9 +49,9 @@ func (n *Needle) String() (str string) {
return
}
func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, contentMd5 string, e error) {
func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64, bytesBuffer *bytes.Buffer) (n *Needle, originalSize int, contentMd5 string, e error) {
n = new(Needle)
pu, e := ParseUpload(r, sizeLimit)
pu, e := ParseUpload(r, sizeLimit, bytesBuffer)
if e != nil {
return
}

29
weed/storage/needle/needle_parse_upload.go

@ -1,6 +1,7 @@
package needle
import (
"bytes"
"crypto/md5"
"encoding/base64"
"fmt"
@ -20,6 +21,7 @@ import (
type ParsedUpload struct {
FileName string
Data []byte
bytesBuffer *bytes.Buffer
MimeType string
PairMap map[string]string
IsGzipped bool
@ -32,8 +34,9 @@ type ParsedUpload struct {
ContentMd5 string
}
func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) {
pu = &ParsedUpload{}
func ParseUpload(r *http.Request, sizeLimit int64, bytesBuffer *bytes.Buffer) (pu *ParsedUpload, e error) {
bytesBuffer.Reset()
pu = &ParsedUpload{bytesBuffer: bytesBuffer}
pu.PairMap = make(map[string]string)
for k, v := range r.Header {
if len(v) > 0 && strings.HasPrefix(k, PairNamePrefix) {
@ -72,6 +75,7 @@ func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) {
if mimeType == "application/octet-stream" {
mimeType = ""
}
if false {
if shouldBeCompressed, iAmSure := util.IsCompressableFileType(ext, mimeType); mimeType == "" && !iAmSure || shouldBeCompressed && iAmSure {
// println("ext", ext, "iAmSure", iAmSure, "shouldBeCompressed", shouldBeCompressed, "mimeType", pu.MimeType)
if compressedData, err := util.GzipData(pu.Data); err == nil {
@ -83,6 +87,7 @@ func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) {
}
}
}
}
// md5
h := md5.New()
@ -98,15 +103,16 @@ func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) {
return
}
func parsePut(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error) {
func parsePut(r *http.Request, sizeLimit int64, pu *ParsedUpload) error {
pu.IsGzipped = r.Header.Get("Content-Encoding") == "gzip"
// pu.IsZstd = r.Header.Get("Content-Encoding") == "zstd"
pu.MimeType = r.Header.Get("Content-Type")
pu.FileName = ""
pu.Data, e = ioutil.ReadAll(io.LimitReader(r.Body, sizeLimit+1))
if e == io.EOF || int64(pu.OriginalDataSize) == sizeLimit+1 {
dataSize, err := pu.bytesBuffer.ReadFrom(io.LimitReader(r.Body, sizeLimit+1))
if err == io.EOF || dataSize == sizeLimit+1 {
io.Copy(ioutil.Discard, r.Body)
}
pu.Data = pu.bytesBuffer.Bytes()
r.Body.Close()
return nil
}
@ -138,15 +144,17 @@ func parseMultipart(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error
pu.FileName = path.Base(pu.FileName)
}
pu.Data, e = ioutil.ReadAll(io.LimitReader(part, sizeLimit+1))
var dataSize int64
dataSize, e = pu.bytesBuffer.ReadFrom(io.LimitReader(part, sizeLimit+1))
if e != nil {
glog.V(0).Infoln("Reading Content [ERROR]", e)
return
}
if len(pu.Data) == int(sizeLimit)+1 {
if dataSize == sizeLimit+1 {
e = fmt.Errorf("file over the limited %d bytes", sizeLimit)
return
}
pu.Data = pu.bytesBuffer.Bytes()
// if the filename is empty string, do a search on the other multi-part items
for pu.FileName == "" {
@ -159,19 +167,20 @@ func parseMultipart(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error
// found the first <file type> multi-part has filename
if fName != "" {
data2, fe2 := ioutil.ReadAll(io.LimitReader(part2, sizeLimit+1))
pu.bytesBuffer.Reset()
dataSize2, fe2 := pu.bytesBuffer.ReadFrom(io.LimitReader(part2, sizeLimit+1))
if fe2 != nil {
glog.V(0).Infoln("Reading Content [ERROR]", fe2)
e = fe2
return
}
if len(data2) == int(sizeLimit)+1 {
if dataSize2 == sizeLimit+1 {
e = fmt.Errorf("file over the limited %d bytes", sizeLimit)
return
}
// update
pu.Data = data2
pu.Data = pu.bytesBuffer.Bytes()
pu.FileName = path.Base(fName)
break
}

59
weed/storage/needle/needle_read_write.go

@ -1,6 +1,7 @@
package needle
import (
"bytes"
"errors"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
@ -9,6 +10,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"io"
"math"
"sync"
)
const (
@ -29,10 +31,14 @@ func (n *Needle) DiskSize(version Version) int64 {
return GetActualSize(n.Size, version)
}
func (n *Needle) prepareWriteBuffer(version Version) ([]byte, Size, int64, error) {
writeBytes := make([]byte, 0)
var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func (n *Needle) prepareWriteBuffer(version Version, writeBytes *bytes.Buffer) (Size, int64, error) {
writeBytes.Reset()
switch version {
case Version1:
header := make([]byte, NeedleHeaderSize)
@ -42,12 +48,12 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, Size, int64, error
SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
size := n.Size
actualSize := NeedleHeaderSize + int64(n.Size)
writeBytes = append(writeBytes, header...)
writeBytes = append(writeBytes, n.Data...)
writeBytes.Write(header)
writeBytes.Write(n.Data)
padding := PaddingLength(n.Size, version)
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
writeBytes = append(writeBytes, header[0:NeedleChecksumSize+padding]...)
return writeBytes, size, actualSize, nil
writeBytes.Write(header[0:NeedleChecksumSize+padding])
return size, actualSize, nil
case Version2, Version3:
header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation
CookieToBytes(header[0:CookieSize], n.Cookie)
@ -79,51 +85,51 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, Size, int64, error
n.Size = 0
}
SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
writeBytes = append(writeBytes, header[0:NeedleHeaderSize]...)
writeBytes.Write(header[0:NeedleHeaderSize])
if n.DataSize > 0 {
util.Uint32toBytes(header[0:4], n.DataSize)
writeBytes = append(writeBytes, header[0:4]...)
writeBytes = append(writeBytes, n.Data...)
writeBytes.Write(header[0:4])
writeBytes.Write(n.Data)
util.Uint8toBytes(header[0:1], n.Flags)
writeBytes = append(writeBytes, header[0:1]...)
writeBytes.Write(header[0:1])
if n.HasName() {
util.Uint8toBytes(header[0:1], n.NameSize)
writeBytes = append(writeBytes, header[0:1]...)
writeBytes = append(writeBytes, n.Name[:n.NameSize]...)
writeBytes.Write(header[0:1])
writeBytes.Write(n.Name[:n.NameSize])
}
if n.HasMime() {
util.Uint8toBytes(header[0:1], n.MimeSize)
writeBytes = append(writeBytes, header[0:1]...)
writeBytes = append(writeBytes, n.Mime...)
writeBytes.Write(header[0:1])
writeBytes.Write(n.Mime)
}
if n.HasLastModifiedDate() {
util.Uint64toBytes(header[0:8], n.LastModified)
writeBytes = append(writeBytes, header[8-LastModifiedBytesLength:8]...)
writeBytes.Write(header[8-LastModifiedBytesLength:8])
}
if n.HasTtl() && n.Ttl != nil {
n.Ttl.ToBytes(header[0:TtlBytesLength])
writeBytes = append(writeBytes, header[0:TtlBytesLength]...)
writeBytes.Write(header[0:TtlBytesLength])
}
if n.HasPairs() {
util.Uint16toBytes(header[0:2], n.PairsSize)
writeBytes = append(writeBytes, header[0:2]...)
writeBytes = append(writeBytes, n.Pairs...)
writeBytes.Write(header[0:2])
writeBytes.Write(n.Pairs)
}
}
padding := PaddingLength(n.Size, version)
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
if version == Version2 {
writeBytes = append(writeBytes, header[0:NeedleChecksumSize+padding]...)
writeBytes.Write(header[0:NeedleChecksumSize+padding])
} else {
// version3
util.Uint64toBytes(header[NeedleChecksumSize:NeedleChecksumSize+TimestampSize], n.AppendAtNs)
writeBytes = append(writeBytes, header[0:NeedleChecksumSize+TimestampSize+padding]...)
writeBytes.Write(header[0:NeedleChecksumSize+TimestampSize+padding])
}
return writeBytes, Size(n.DataSize), GetActualSize(n.Size, version), nil
return Size(n.DataSize), GetActualSize(n.Size, version), nil
}
return writeBytes, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
return 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
}
func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset uint64, size Size, actualSize int64, err error) {
@ -146,10 +152,13 @@ func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset u
return
}
bytesToWrite, size, actualSize, err := n.prepareWriteBuffer(version)
bytesBuffer := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(bytesBuffer)
size, actualSize, err = n.prepareWriteBuffer(version, bytesBuffer)
if err == nil {
_, err = w.WriteAt(bytesToWrite, int64(offset))
_, err = w.WriteAt(bytesBuffer.Bytes(), int64(offset))
}
return offset, size, actualSize, err

Loading…
Cancel
Save