diff --git a/weed/operation/upload_chunked.go b/weed/operation/upload_chunked.go new file mode 100644 index 000000000..d0a73819a --- /dev/null +++ b/weed/operation/upload_chunked.go @@ -0,0 +1,214 @@ +package operation + +import ( + "bytes" + "context" + "crypto/md5" + "fmt" + "hash" + "io" + "sync" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/security" +) + +// ChunkedUploadResult contains the result of a chunked upload +type ChunkedUploadResult struct { + FileChunks []*filer_pb.FileChunk + Md5Hash hash.Hash + TotalSize int64 + SmallContent []byte // For files smaller than threshold +} + +// ChunkedUploadOption contains options for chunked uploads +type ChunkedUploadOption struct { + ChunkSize int32 + SmallFileLimit int64 + Collection string + Replication string + DataCenter string + SaveSmallInline bool + Jwt security.EncodedJwt + MimeType string + AssignFunc func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) +} + +var chunkBufferPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, +} + +// UploadReaderInChunks reads from reader and uploads in chunks to volume servers +// This prevents OOM by processing the stream in fixed-size chunks +// Returns file chunks, MD5 hash, total size, and any small content stored inline +func UploadReaderInChunks(ctx context.Context, reader io.Reader, opt *ChunkedUploadOption) (*ChunkedUploadResult, error) { + + md5Hash := md5.New() + var partReader = io.TeeReader(reader, md5Hash) + + var fileChunks []*filer_pb.FileChunk + var fileChunksLock sync.Mutex + var uploadErr error + var uploadErrLock sync.Mutex + var chunkOffset int64 = 0 + + var wg sync.WaitGroup + var bytesBufferCounter int64 = 4 + bytesBufferLimitChan := make(chan struct{}, bytesBufferCounter) + + for { + // Throttle buffer usage + bytesBufferLimitChan <- struct{}{} + + // Check for errors from parallel uploads + uploadErrLock.Lock() + if uploadErr != nil { + <-bytesBufferLimitChan + uploadErrLock.Unlock() + break + } + uploadErrLock.Unlock() + + // Get buffer from pool + bytesBuffer := chunkBufferPool.Get().(*bytes.Buffer) + limitedReader := io.LimitReader(partReader, int64(opt.ChunkSize)) + bytesBuffer.Reset() + + // Read one chunk + dataSize, err := bytesBuffer.ReadFrom(limitedReader) + if err != nil || dataSize == 0 { + chunkBufferPool.Put(bytesBuffer) + <-bytesBufferLimitChan + if err != nil { + uploadErrLock.Lock() + if uploadErr == nil { + uploadErr = err + } + uploadErrLock.Unlock() + } + break + } + + // For small files at offset 0, store inline instead of uploading + if chunkOffset == 0 && opt.SaveSmallInline && dataSize < opt.SmallFileLimit { + smallContent := make([]byte, dataSize) + bytesBuffer.Read(smallContent) + chunkBufferPool.Put(bytesBuffer) + <-bytesBufferLimitChan + + return &ChunkedUploadResult{ + FileChunks: nil, + Md5Hash: md5Hash, + TotalSize: dataSize, + SmallContent: smallContent, + }, nil + } + + // Upload chunk in parallel goroutine + wg.Add(1) + go func(offset int64, buf *bytes.Buffer) { + defer func() { + chunkBufferPool.Put(buf) + <-bytesBufferLimitChan + wg.Done() + }() + + // Assign volume for this chunk + _, assignResult, assignErr := opt.AssignFunc(ctx, 1) + if assignErr != nil { + uploadErrLock.Lock() + if uploadErr == nil { + uploadErr = fmt.Errorf("assign volume: %w", assignErr) + } + uploadErrLock.Unlock() + return + } + + // Upload chunk data + uploadUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid) + uploadOption := &UploadOption{ + UploadUrl: uploadUrl, + Cipher: false, + IsInputCompressed: false, + MimeType: opt.MimeType, + PairMap: nil, + Jwt: opt.Jwt, + } + + uploader, uploaderErr := NewUploader() + if uploaderErr != nil { + uploadErrLock.Lock() + if uploadErr == nil { + uploadErr = fmt.Errorf("create uploader: %w", uploaderErr) + } + uploadErrLock.Unlock() + return + } + + uploadResult, uploadResultErr := uploader.UploadData(ctx, buf.Bytes(), uploadOption) + if uploadResultErr != nil { + uploadErrLock.Lock() + if uploadErr == nil { + uploadErr = fmt.Errorf("upload chunk: %w", uploadResultErr) + } + uploadErrLock.Unlock() + return + } + + // Create chunk entry + fid, _ := filer_pb.ToFileIdObject(assignResult.Fid) + chunk := &filer_pb.FileChunk{ + FileId: assignResult.Fid, + Offset: offset, + Size: uint64(uploadResult.Size), + ETag: uploadResult.ContentMd5, + Fid: fid, + CipherKey: uploadResult.CipherKey, + } + + fileChunksLock.Lock() + fileChunks = append(fileChunks, chunk) + glog.V(4).Infof("uploaded chunk %d to %s [%d,%d)", len(fileChunks), chunk.FileId, offset, offset+int64(chunk.Size)) + fileChunksLock.Unlock() + + }(chunkOffset, bytesBuffer) + + // Update offset for next chunk + chunkOffset += dataSize + + // If this was a partial chunk, we're done + if dataSize < int64(opt.ChunkSize) { + break + } + } + + // Wait for all uploads to complete + wg.Wait() + + // Check for errors + if uploadErr != nil { + glog.Errorf("chunked upload failed: %v", uploadErr) + return nil, uploadErr + } + + // Sort chunks by offset + // Note: We could use slices.SortFunc here, but keeping it simple for Go 1.20 compatibility + for i := 0; i < len(fileChunks); i++ { + for j := i + 1; j < len(fileChunks); j++ { + if fileChunks[i].Offset > fileChunks[j].Offset { + fileChunks[i], fileChunks[j] = fileChunks[j], fileChunks[i] + } + } + } + + return &ChunkedUploadResult{ + FileChunks: fileChunks, + Md5Hash: md5Hash, + TotalSize: chunkOffset, + SmallContent: nil, + }, nil +} + diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index d8181f3dd..6ef0dcf97 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -296,151 +296,117 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader } filePath = decodedPath - // Calculate MD5 hash - hash := md5.New() - var body = io.TeeReader(dataReader, hash) - - // Step 1: Assign volume from filer (via gRPC) - var assignResult *filer_pb.AssignVolumeResponse - err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - collection := "" - if s3a.option.FilerGroup != "" { - collection = s3a.getCollectionName(bucket) - } - resp, err := client.AssignVolume(context.Background(), &filer_pb.AssignVolumeRequest{ - Count: 1, - Replication: "", - Collection: collection, - DiskType: "", - DataCenter: s3a.option.DataCenter, - Path: filePath, + // Step 1 & 2: Use auto-chunking to handle large files without OOM + // This splits large uploads into 8MB chunks, preventing memory issues on both S3 API and volume servers + const chunkSize = 8 * 1024 * 1024 // 8MB chunks (S3 standard) + const smallFileLimit = 256 * 1024 // 256KB - store inline in filer + + collection := "" + if s3a.option.FilerGroup != "" { + collection = s3a.getCollectionName(bucket) + } + + // Create assign function for chunked upload + assignFunc := func(ctx context.Context, count int) (*operation.VolumeAssignRequest, *operation.AssignResult, error) { + var assignResult *filer_pb.AssignVolumeResponse + err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.AssignVolume(ctx, &filer_pb.AssignVolumeRequest{ + Count: int32(count), + Replication: "", + Collection: collection, + DiskType: "", + DataCenter: s3a.option.DataCenter, + Path: filePath, + }) + if err != nil { + return fmt.Errorf("assign volume: %w", err) + } + if resp.Error != "" { + return fmt.Errorf("assign volume: %v", resp.Error) + } + assignResult = resp + return nil }) if err != nil { - return fmt.Errorf("assign volume: %w", err) + return nil, nil, err } - if resp.Error != "" { - return fmt.Errorf("assign volume: %v", resp.Error) - } - assignResult = resp - return nil + + // Convert filer_pb.AssignVolumeResponse to operation.AssignResult + return nil, &operation.AssignResult{ + Fid: assignResult.FileId, + Url: assignResult.Location.Url, + PublicUrl: assignResult.Location.PublicUrl, + Count: uint64(count), + Auth: security.EncodedJwt(assignResult.Auth), + }, nil + } + + // Upload with auto-chunking + chunkResult, err := operation.UploadReaderInChunks(r.Context(), dataReader, &operation.ChunkedUploadOption{ + ChunkSize: chunkSize, + SmallFileLimit: smallFileLimit, + Collection: collection, + DataCenter: s3a.option.DataCenter, + SaveSmallInline: false, // S3 API always creates chunks, never stores inline + MimeType: r.Header.Get("Content-Type"), + AssignFunc: assignFunc, }) if err != nil { - glog.Errorf("putToFiler: failed to assign volume: %v", err) - return "", s3err.ErrInternalError, "" - } - - // Step 2: Upload data directly to volume server - volumeUploadUrl := fmt.Sprintf("http://%s/%s", assignResult.Location.Url, assignResult.FileId) - - // Read all data for upload (we need to calculate hash anyway) - data, readErr := io.ReadAll(body) - if readErr != nil { - glog.Errorf("putToFiler: failed to read data: %v", readErr) - return "", s3err.ErrInternalError, "" - } - - // Calculate ETag for S3 API response (hex format) - md5Sum := hash.Sum(nil) - etag = fmt.Sprintf("%x", md5Sum) - - glog.V(3).Infof("putToFiler: Uploading to volume - path=%s, size=%d, etag(hex)=%s, partNumber=%d", - filePath, len(data), etag, partNumber) - - uploadOption := &operation.UploadOption{ - UploadUrl: volumeUploadUrl, - Cipher: false, - IsInputCompressed: false, - MimeType: r.Header.Get("Content-Type"), - PairMap: nil, - Jwt: security.EncodedJwt(assignResult.Auth), - } - - uploader, uploaderErr := operation.NewUploader() - if uploaderErr != nil { - glog.Errorf("putToFiler: failed to create uploader: %v", uploaderErr) - return "", s3err.ErrInternalError, "" - } - - glog.V(3).Infof("putToFiler: Uploading to volume server - fileId=%s", assignResult.FileId) - uploadResult, uploadErr := uploader.UploadData(context.Background(), data, uploadOption) - if uploadErr != nil { - glog.Errorf("putToFiler: failed to upload to volume server for %s: %v", filePath, uploadErr) - if strings.Contains(uploadErr.Error(), s3err.ErrMsgPayloadChecksumMismatch) { + glog.Errorf("putToFiler: chunked upload failed: %v", err) + if strings.Contains(err.Error(), s3err.ErrMsgPayloadChecksumMismatch) { return "", s3err.ErrInvalidDigest, "" } return "", s3err.ErrInternalError, "" } - - glog.V(3).Infof("putToFiler: Volume upload SUCCESS - fileId=%s, size=%d, md5(base64)=%s", - assignResult.FileId, uploadResult.Size, uploadResult.ContentMd5) - - // Step 3: Create metadata entry - now := time.Now() - mimeType := r.Header.Get("Content-Type") - if mimeType == "" { - mimeType = "application/octet-stream" - } - - // Create file chunk - fid, fidErr := filer_pb.ToFileIdObject(assignResult.FileId) - if fidErr != nil { - glog.Errorf("putToFiler: failed to parse file ID: %v", fidErr) - return "", s3err.ErrInternalError, "" - } - - // IMPORTANT: FileChunk.ETag must be base64-encoded (from uploadResult.ContentMd5) - // NOT hex-encoded etag! The filer.ETagChunks() function expects base64. - // ModifiedTsNs is critical for multipart race condition handling - when multiple - // uploads for the same part number occur, the one with the latest timestamp wins - fileChunk := &filer_pb.FileChunk{ - FileId: assignResult.FileId, - Offset: 0, - Size: uint64(uploadResult.Size), - ModifiedTsNs: now.UnixNano(), // Set to current time for duplicate part resolution - ETag: uploadResult.ContentMd5, // Base64-encoded MD5 from volume server - Fid: fid, - CipherKey: uploadResult.CipherKey, - } - - // Set SSE metadata in chunk (matches filer behavior) - // The filer stores SSE info in both entry.Extended AND chunk fields for detection/decryption + + // Step 3: Calculate ETag and add SSE metadata to chunks + md5Sum := chunkResult.Md5Hash.Sum(nil) + etag = fmt.Sprintf("%x", md5Sum) + + glog.V(3).Infof("putToFiler: Chunked upload SUCCESS - path=%s, chunks=%d, size=%d, etag=%s", + filePath, len(chunkResult.FileChunks), chunkResult.TotalSize, etag) + + // Add SSE metadata to all chunks if present if customerKey != nil { // SSE-C: Create per-chunk metadata (matches filer logic) - fileChunk.SseType = filer_pb.SSEType_SSE_C - if len(sseIV) > 0 { - ssecMetadataStruct := struct { - Algorithm string `json:"algorithm"` - IV string `json:"iv"` - KeyMD5 string `json:"keyMD5"` - PartOffset int64 `json:"partOffset"` - }{ - Algorithm: "AES256", - IV: base64.StdEncoding.EncodeToString(sseIV), - KeyMD5: customerKey.KeyMD5, - PartOffset: partOffset, - } - if ssecMetadata, serErr := json.Marshal(ssecMetadataStruct); serErr == nil { - fileChunk.SseMetadata = ssecMetadata + for _, chunk := range chunkResult.FileChunks { + chunk.SseType = filer_pb.SSEType_SSE_C + if len(sseIV) > 0 { + ssecMetadataStruct := struct { + Algorithm string `json:"algorithm"` + IV string `json:"iv"` + KeyMD5 string `json:"keyMD5"` + PartOffset int64 `json:"partOffset"` + }{ + Algorithm: "AES256", + IV: base64.StdEncoding.EncodeToString(sseIV), + KeyMD5: customerKey.KeyMD5, + PartOffset: chunk.Offset, // Use actual chunk offset + } + if ssecMetadata, serErr := json.Marshal(ssecMetadataStruct); serErr == nil { + chunk.SseMetadata = ssecMetadata + } } } } else if sseKMSKey != nil { - // SSE-KMS: Store serialized metadata in chunk - fileChunk.SseType = filer_pb.SSEType_SSE_KMS - fileChunk.SseMetadata = sseKMSMetadata + // SSE-KMS: Store serialized metadata in all chunks + for _, chunk := range chunkResult.FileChunks { + chunk.SseType = filer_pb.SSEType_SSE_KMS + chunk.SseMetadata = sseKMSMetadata + } } else if sseS3Key != nil { - // SSE-S3: Store serialized metadata in chunk - fileChunk.SseType = filer_pb.SSEType_SSE_S3 - fileChunk.SseMetadata = sseS3Metadata + // SSE-S3: Store serialized metadata in all chunks + for _, chunk := range chunkResult.FileChunks { + chunk.SseType = filer_pb.SSEType_SSE_S3 + chunk.SseMetadata = sseS3Metadata + } } - glog.V(3).Infof("putToFiler: Created FileChunk - fileId=%s, size=%d, etag(base64)=%s (for multipart ETag calc)", - fileChunk.FileId, fileChunk.Size, fileChunk.ETag) - - // Decode MD5 from base64 to bytes for entry.Attributes.Md5 - md5Bytes, md5Err := base64.StdEncoding.DecodeString(uploadResult.ContentMd5) - if md5Err != nil { - glog.Errorf("putToFiler: failed to decode MD5 %s: %v", uploadResult.ContentMd5, md5Err) - return "", s3err.ErrInternalError, "" + // Step 4: Create metadata entry + now := time.Now() + mimeType := r.Header.Get("Content-Type") + if mimeType == "" { + mimeType = "application/octet-stream" } // Create entry @@ -454,10 +420,10 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader Uid: 0, Gid: 0, Mime: mimeType, - FileSize: uint64(uploadResult.Size), - Md5: md5Bytes, // Set MD5 bytes for multipart ETag validation + FileSize: uint64(chunkResult.TotalSize), + Md5: md5Sum, // Set MD5 bytes for multipart ETag validation }, - Chunks: []*filer_pb.FileChunk{fileChunk}, + Chunks: chunkResult.FileChunks, // All chunks from auto-chunking Extended: make(map[string][]byte), } @@ -549,7 +515,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader glog.V(2).Infof("putToFiler: Metadata saved SUCCESS - path=%s, etag(hex)=%s, size=%d, partNumber=%d", filePath, etag, entry.Attributes.FileSize, partNumber) - BucketTrafficReceived(int64(uploadResult.Size), r) + BucketTrafficReceived(chunkResult.TotalSize, r) // Return the SSE type determined by the unified handler return etag, s3err.ErrNone, sseResult.SSEType