Browse Source

upload in chunks

pull/7481/head
chrislu 3 weeks ago
parent
commit
7af0594eb7
  1. 214
      weed/operation/upload_chunked.go
  2. 230
      weed/s3api/s3api_object_handlers_put.go

214
weed/operation/upload_chunked.go

@ -0,0 +1,214 @@
package operation
import (
"bytes"
"context"
"crypto/md5"
"fmt"
"hash"
"io"
"sync"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
)
// ChunkedUploadResult contains the result of a chunked upload
type ChunkedUploadResult struct {
FileChunks []*filer_pb.FileChunk
Md5Hash hash.Hash
TotalSize int64
SmallContent []byte // For files smaller than threshold
}
// ChunkedUploadOption contains options for chunked uploads
type ChunkedUploadOption struct {
ChunkSize int32
SmallFileLimit int64
Collection string
Replication string
DataCenter string
SaveSmallInline bool
Jwt security.EncodedJwt
MimeType string
AssignFunc func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error)
}
var chunkBufferPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
// UploadReaderInChunks reads from reader and uploads in chunks to volume servers
// This prevents OOM by processing the stream in fixed-size chunks
// Returns file chunks, MD5 hash, total size, and any small content stored inline
func UploadReaderInChunks(ctx context.Context, reader io.Reader, opt *ChunkedUploadOption) (*ChunkedUploadResult, error) {
md5Hash := md5.New()
var partReader = io.TeeReader(reader, md5Hash)
var fileChunks []*filer_pb.FileChunk
var fileChunksLock sync.Mutex
var uploadErr error
var uploadErrLock sync.Mutex
var chunkOffset int64 = 0
var wg sync.WaitGroup
var bytesBufferCounter int64 = 4
bytesBufferLimitChan := make(chan struct{}, bytesBufferCounter)
for {
// Throttle buffer usage
bytesBufferLimitChan <- struct{}{}
// Check for errors from parallel uploads
uploadErrLock.Lock()
if uploadErr != nil {
<-bytesBufferLimitChan
uploadErrLock.Unlock()
break
}
uploadErrLock.Unlock()
// Get buffer from pool
bytesBuffer := chunkBufferPool.Get().(*bytes.Buffer)
limitedReader := io.LimitReader(partReader, int64(opt.ChunkSize))
bytesBuffer.Reset()
// Read one chunk
dataSize, err := bytesBuffer.ReadFrom(limitedReader)
if err != nil || dataSize == 0 {
chunkBufferPool.Put(bytesBuffer)
<-bytesBufferLimitChan
if err != nil {
uploadErrLock.Lock()
if uploadErr == nil {
uploadErr = err
}
uploadErrLock.Unlock()
}
break
}
// For small files at offset 0, store inline instead of uploading
if chunkOffset == 0 && opt.SaveSmallInline && dataSize < opt.SmallFileLimit {
smallContent := make([]byte, dataSize)
bytesBuffer.Read(smallContent)
chunkBufferPool.Put(bytesBuffer)
<-bytesBufferLimitChan
return &ChunkedUploadResult{
FileChunks: nil,
Md5Hash: md5Hash,
TotalSize: dataSize,
SmallContent: smallContent,
}, nil
}
// Upload chunk in parallel goroutine
wg.Add(1)
go func(offset int64, buf *bytes.Buffer) {
defer func() {
chunkBufferPool.Put(buf)
<-bytesBufferLimitChan
wg.Done()
}()
// Assign volume for this chunk
_, assignResult, assignErr := opt.AssignFunc(ctx, 1)
if assignErr != nil {
uploadErrLock.Lock()
if uploadErr == nil {
uploadErr = fmt.Errorf("assign volume: %w", assignErr)
}
uploadErrLock.Unlock()
return
}
// Upload chunk data
uploadUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
uploadOption := &UploadOption{
UploadUrl: uploadUrl,
Cipher: false,
IsInputCompressed: false,
MimeType: opt.MimeType,
PairMap: nil,
Jwt: opt.Jwt,
}
uploader, uploaderErr := NewUploader()
if uploaderErr != nil {
uploadErrLock.Lock()
if uploadErr == nil {
uploadErr = fmt.Errorf("create uploader: %w", uploaderErr)
}
uploadErrLock.Unlock()
return
}
uploadResult, uploadResultErr := uploader.UploadData(ctx, buf.Bytes(), uploadOption)
if uploadResultErr != nil {
uploadErrLock.Lock()
if uploadErr == nil {
uploadErr = fmt.Errorf("upload chunk: %w", uploadResultErr)
}
uploadErrLock.Unlock()
return
}
// Create chunk entry
fid, _ := filer_pb.ToFileIdObject(assignResult.Fid)
chunk := &filer_pb.FileChunk{
FileId: assignResult.Fid,
Offset: offset,
Size: uint64(uploadResult.Size),
ETag: uploadResult.ContentMd5,
Fid: fid,
CipherKey: uploadResult.CipherKey,
}
fileChunksLock.Lock()
fileChunks = append(fileChunks, chunk)
glog.V(4).Infof("uploaded chunk %d to %s [%d,%d)", len(fileChunks), chunk.FileId, offset, offset+int64(chunk.Size))
fileChunksLock.Unlock()
}(chunkOffset, bytesBuffer)
// Update offset for next chunk
chunkOffset += dataSize
// If this was a partial chunk, we're done
if dataSize < int64(opt.ChunkSize) {
break
}
}
// Wait for all uploads to complete
wg.Wait()
// Check for errors
if uploadErr != nil {
glog.Errorf("chunked upload failed: %v", uploadErr)
return nil, uploadErr
}
// Sort chunks by offset
// Note: We could use slices.SortFunc here, but keeping it simple for Go 1.20 compatibility
for i := 0; i < len(fileChunks); i++ {
for j := i + 1; j < len(fileChunks); j++ {
if fileChunks[i].Offset > fileChunks[j].Offset {
fileChunks[i], fileChunks[j] = fileChunks[j], fileChunks[i]
}
}
}
return &ChunkedUploadResult{
FileChunks: fileChunks,
Md5Hash: md5Hash,
TotalSize: chunkOffset,
SmallContent: nil,
}, nil
}

230
weed/s3api/s3api_object_handlers_put.go

@ -296,151 +296,117 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
}
filePath = decodedPath
// Calculate MD5 hash
hash := md5.New()
var body = io.TeeReader(dataReader, hash)
// Step 1: Assign volume from filer (via gRPC)
var assignResult *filer_pb.AssignVolumeResponse
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
collection := ""
if s3a.option.FilerGroup != "" {
collection = s3a.getCollectionName(bucket)
}
resp, err := client.AssignVolume(context.Background(), &filer_pb.AssignVolumeRequest{
Count: 1,
Replication: "",
Collection: collection,
DiskType: "",
DataCenter: s3a.option.DataCenter,
Path: filePath,
// Step 1 & 2: Use auto-chunking to handle large files without OOM
// This splits large uploads into 8MB chunks, preventing memory issues on both S3 API and volume servers
const chunkSize = 8 * 1024 * 1024 // 8MB chunks (S3 standard)
const smallFileLimit = 256 * 1024 // 256KB - store inline in filer
collection := ""
if s3a.option.FilerGroup != "" {
collection = s3a.getCollectionName(bucket)
}
// Create assign function for chunked upload
assignFunc := func(ctx context.Context, count int) (*operation.VolumeAssignRequest, *operation.AssignResult, error) {
var assignResult *filer_pb.AssignVolumeResponse
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.AssignVolume(ctx, &filer_pb.AssignVolumeRequest{
Count: int32(count),
Replication: "",
Collection: collection,
DiskType: "",
DataCenter: s3a.option.DataCenter,
Path: filePath,
})
if err != nil {
return fmt.Errorf("assign volume: %w", err)
}
if resp.Error != "" {
return fmt.Errorf("assign volume: %v", resp.Error)
}
assignResult = resp
return nil
})
if err != nil {
return fmt.Errorf("assign volume: %w", err)
return nil, nil, err
}
if resp.Error != "" {
return fmt.Errorf("assign volume: %v", resp.Error)
}
assignResult = resp
return nil
// Convert filer_pb.AssignVolumeResponse to operation.AssignResult
return nil, &operation.AssignResult{
Fid: assignResult.FileId,
Url: assignResult.Location.Url,
PublicUrl: assignResult.Location.PublicUrl,
Count: uint64(count),
Auth: security.EncodedJwt(assignResult.Auth),
}, nil
}
// Upload with auto-chunking
chunkResult, err := operation.UploadReaderInChunks(r.Context(), dataReader, &operation.ChunkedUploadOption{
ChunkSize: chunkSize,
SmallFileLimit: smallFileLimit,
Collection: collection,
DataCenter: s3a.option.DataCenter,
SaveSmallInline: false, // S3 API always creates chunks, never stores inline
MimeType: r.Header.Get("Content-Type"),
AssignFunc: assignFunc,
})
if err != nil {
glog.Errorf("putToFiler: failed to assign volume: %v", err)
return "", s3err.ErrInternalError, ""
}
// Step 2: Upload data directly to volume server
volumeUploadUrl := fmt.Sprintf("http://%s/%s", assignResult.Location.Url, assignResult.FileId)
// Read all data for upload (we need to calculate hash anyway)
data, readErr := io.ReadAll(body)
if readErr != nil {
glog.Errorf("putToFiler: failed to read data: %v", readErr)
return "", s3err.ErrInternalError, ""
}
// Calculate ETag for S3 API response (hex format)
md5Sum := hash.Sum(nil)
etag = fmt.Sprintf("%x", md5Sum)
glog.V(3).Infof("putToFiler: Uploading to volume - path=%s, size=%d, etag(hex)=%s, partNumber=%d",
filePath, len(data), etag, partNumber)
uploadOption := &operation.UploadOption{
UploadUrl: volumeUploadUrl,
Cipher: false,
IsInputCompressed: false,
MimeType: r.Header.Get("Content-Type"),
PairMap: nil,
Jwt: security.EncodedJwt(assignResult.Auth),
}
uploader, uploaderErr := operation.NewUploader()
if uploaderErr != nil {
glog.Errorf("putToFiler: failed to create uploader: %v", uploaderErr)
return "", s3err.ErrInternalError, ""
}
glog.V(3).Infof("putToFiler: Uploading to volume server - fileId=%s", assignResult.FileId)
uploadResult, uploadErr := uploader.UploadData(context.Background(), data, uploadOption)
if uploadErr != nil {
glog.Errorf("putToFiler: failed to upload to volume server for %s: %v", filePath, uploadErr)
if strings.Contains(uploadErr.Error(), s3err.ErrMsgPayloadChecksumMismatch) {
glog.Errorf("putToFiler: chunked upload failed: %v", err)
if strings.Contains(err.Error(), s3err.ErrMsgPayloadChecksumMismatch) {
return "", s3err.ErrInvalidDigest, ""
}
return "", s3err.ErrInternalError, ""
}
glog.V(3).Infof("putToFiler: Volume upload SUCCESS - fileId=%s, size=%d, md5(base64)=%s",
assignResult.FileId, uploadResult.Size, uploadResult.ContentMd5)
// Step 3: Create metadata entry
now := time.Now()
mimeType := r.Header.Get("Content-Type")
if mimeType == "" {
mimeType = "application/octet-stream"
}
// Create file chunk
fid, fidErr := filer_pb.ToFileIdObject(assignResult.FileId)
if fidErr != nil {
glog.Errorf("putToFiler: failed to parse file ID: %v", fidErr)
return "", s3err.ErrInternalError, ""
}
// IMPORTANT: FileChunk.ETag must be base64-encoded (from uploadResult.ContentMd5)
// NOT hex-encoded etag! The filer.ETagChunks() function expects base64.
// ModifiedTsNs is critical for multipart race condition handling - when multiple
// uploads for the same part number occur, the one with the latest timestamp wins
fileChunk := &filer_pb.FileChunk{
FileId: assignResult.FileId,
Offset: 0,
Size: uint64(uploadResult.Size),
ModifiedTsNs: now.UnixNano(), // Set to current time for duplicate part resolution
ETag: uploadResult.ContentMd5, // Base64-encoded MD5 from volume server
Fid: fid,
CipherKey: uploadResult.CipherKey,
}
// Set SSE metadata in chunk (matches filer behavior)
// The filer stores SSE info in both entry.Extended AND chunk fields for detection/decryption
// Step 3: Calculate ETag and add SSE metadata to chunks
md5Sum := chunkResult.Md5Hash.Sum(nil)
etag = fmt.Sprintf("%x", md5Sum)
glog.V(3).Infof("putToFiler: Chunked upload SUCCESS - path=%s, chunks=%d, size=%d, etag=%s",
filePath, len(chunkResult.FileChunks), chunkResult.TotalSize, etag)
// Add SSE metadata to all chunks if present
if customerKey != nil {
// SSE-C: Create per-chunk metadata (matches filer logic)
fileChunk.SseType = filer_pb.SSEType_SSE_C
if len(sseIV) > 0 {
ssecMetadataStruct := struct {
Algorithm string `json:"algorithm"`
IV string `json:"iv"`
KeyMD5 string `json:"keyMD5"`
PartOffset int64 `json:"partOffset"`
}{
Algorithm: "AES256",
IV: base64.StdEncoding.EncodeToString(sseIV),
KeyMD5: customerKey.KeyMD5,
PartOffset: partOffset,
}
if ssecMetadata, serErr := json.Marshal(ssecMetadataStruct); serErr == nil {
fileChunk.SseMetadata = ssecMetadata
for _, chunk := range chunkResult.FileChunks {
chunk.SseType = filer_pb.SSEType_SSE_C
if len(sseIV) > 0 {
ssecMetadataStruct := struct {
Algorithm string `json:"algorithm"`
IV string `json:"iv"`
KeyMD5 string `json:"keyMD5"`
PartOffset int64 `json:"partOffset"`
}{
Algorithm: "AES256",
IV: base64.StdEncoding.EncodeToString(sseIV),
KeyMD5: customerKey.KeyMD5,
PartOffset: chunk.Offset, // Use actual chunk offset
}
if ssecMetadata, serErr := json.Marshal(ssecMetadataStruct); serErr == nil {
chunk.SseMetadata = ssecMetadata
}
}
}
} else if sseKMSKey != nil {
// SSE-KMS: Store serialized metadata in chunk
fileChunk.SseType = filer_pb.SSEType_SSE_KMS
fileChunk.SseMetadata = sseKMSMetadata
// SSE-KMS: Store serialized metadata in all chunks
for _, chunk := range chunkResult.FileChunks {
chunk.SseType = filer_pb.SSEType_SSE_KMS
chunk.SseMetadata = sseKMSMetadata
}
} else if sseS3Key != nil {
// SSE-S3: Store serialized metadata in chunk
fileChunk.SseType = filer_pb.SSEType_SSE_S3
fileChunk.SseMetadata = sseS3Metadata
// SSE-S3: Store serialized metadata in all chunks
for _, chunk := range chunkResult.FileChunks {
chunk.SseType = filer_pb.SSEType_SSE_S3
chunk.SseMetadata = sseS3Metadata
}
}
glog.V(3).Infof("putToFiler: Created FileChunk - fileId=%s, size=%d, etag(base64)=%s (for multipart ETag calc)",
fileChunk.FileId, fileChunk.Size, fileChunk.ETag)
// Decode MD5 from base64 to bytes for entry.Attributes.Md5
md5Bytes, md5Err := base64.StdEncoding.DecodeString(uploadResult.ContentMd5)
if md5Err != nil {
glog.Errorf("putToFiler: failed to decode MD5 %s: %v", uploadResult.ContentMd5, md5Err)
return "", s3err.ErrInternalError, ""
// Step 4: Create metadata entry
now := time.Now()
mimeType := r.Header.Get("Content-Type")
if mimeType == "" {
mimeType = "application/octet-stream"
}
// Create entry
@ -454,10 +420,10 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
Uid: 0,
Gid: 0,
Mime: mimeType,
FileSize: uint64(uploadResult.Size),
Md5: md5Bytes, // Set MD5 bytes for multipart ETag validation
FileSize: uint64(chunkResult.TotalSize),
Md5: md5Sum, // Set MD5 bytes for multipart ETag validation
},
Chunks: []*filer_pb.FileChunk{fileChunk},
Chunks: chunkResult.FileChunks, // All chunks from auto-chunking
Extended: make(map[string][]byte),
}
@ -549,7 +515,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
glog.V(2).Infof("putToFiler: Metadata saved SUCCESS - path=%s, etag(hex)=%s, size=%d, partNumber=%d",
filePath, etag, entry.Attributes.FileSize, partNumber)
BucketTrafficReceived(int64(uploadResult.Size), r)
BucketTrafficReceived(chunkResult.TotalSize, r)
// Return the SSE type determined by the unified handler
return etag, s3err.ErrNone, sseResult.SSEType

Loading…
Cancel
Save