From 7ef058d89a82381f0903982b1ac1dcde16d211cf Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 13 Nov 2025 23:26:36 -0800 Subject: [PATCH] directly read write volume servers --- weed/s3api/s3api_object_handlers.go | 186 ++++++++++- weed/s3api/s3api_object_handlers_multipart.go | 5 +- weed/s3api/s3api_object_handlers_put.go | 297 ++++++++++++------ 3 files changed, 379 insertions(+), 109 deletions(-) diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 98d0ffede..7b50a997b 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -2,12 +2,15 @@ package s3api import ( "bytes" + "context" "encoding/base64" "errors" "fmt" "io" + "mime" "net/http" "net/url" + "path/filepath" "sort" "strconv" "strings" @@ -15,6 +18,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/wdclient" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" @@ -409,31 +414,178 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) } } + // NEW OPTIMIZATION: Stream directly from volume servers, bypassing filer proxy + // This eliminates the 19ms filer proxy overhead + // Check if this is an SSE object for Range request handling - // This applies to both versioned and non-versioned objects - if originalRangeHeader != "" && objectEntryForSSE != nil { - primarySSEType := s3a.detectPrimarySSEType(objectEntryForSSE) - if primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS { - sseObject = true - // Temporarily remove Range header to get full encrypted data from filer - r.Header.Del("Range") + primarySSEType := s3a.detectPrimarySSEType(objectEntryForSSE) + if originalRangeHeader != "" && (primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS) { + sseObject = true + // Temporarily remove Range header to get full encrypted data + r.Header.Del("Range") + } + + // Add SSE response headers before streaming + if objectEntryForSSE != nil { + // Create a fake response to get SSE headers + fakeResp := &http.Response{Header: make(http.Header)} + s3a.addSSEHeadersToResponse(fakeResp, objectEntryForSSE) + // Copy SSE headers to actual response + for k, v := range fakeResp.Header { + if strings.HasPrefix(k, "X-Amz-Server-Side-Encryption") { + w.Header()[k] = v + } } } - s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) { - // Restore the original Range header for SSE processing - if sseObject && originalRangeHeader != "" { - r.Header.Set("Range", originalRangeHeader) + // Restore the original Range header for SSE processing + if sseObject && originalRangeHeader != "" { + r.Header.Set("Range", originalRangeHeader) + } + + // Stream directly from volume servers + err = s3a.streamFromVolumeServers(w, r, objectEntryForSSE, primarySSEType) + if err != nil { + glog.Errorf("GetObjectHandler: failed to stream from volume servers: %v", err) + // Don't write error response - headers already sent + return + } +} + +// streamFromVolumeServers streams object data directly from volume servers, bypassing filer proxy +// This eliminates the ~19ms filer proxy overhead by reading chunks directly +func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) error { + if entry == nil { + return fmt.Errorf("entry is nil") + } + + // Get file size + totalSize := int64(filer.FileSize(entry)) + + // Set standard HTTP headers from entry metadata + s3a.setResponseHeaders(w, entry, totalSize) + + // For small files stored inline in entry.Content + if len(entry.Content) > 0 && totalSize == int64(len(entry.Content)) { + _, err := w.Write(entry.Content) + return err + } + + // Get chunks + chunks := entry.GetChunks() + if len(chunks) == 0 { + w.WriteHeader(http.StatusOK) + return nil + } + + // Create lookup function via filer client + ctx := r.Context() + lookupFileIdFn := func(ctx context.Context, fileId string) ([]string, error) { + var urls []string + err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + vid := filer.VolumeId(fileId) + resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ + VolumeIds: []string{vid}, + }) + if err != nil { + return err + } + if locs, found := resp.LocationsMap[vid]; found { + for _, loc := range locs.Locations { + urls = append(urls, "http://"+loc.Url+"/"+fileId) + } + } + return nil + }) + return urls, err + } + + // Resolve chunk manifests + resolvedChunks, _, err := filer.ResolveChunkManifest(ctx, lookupFileIdFn, chunks, 0, totalSize) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return fmt.Errorf("failed to resolve chunks: %v", err) + } + + // Prepare streaming function with simple master client wrapper + masterClient := &simpleMasterClient{lookupFn: lookupFileIdFn} + streamFn, err := filer.PrepareStreamContentWithThrottler( + ctx, + masterClient, + func(fileId string) string { + // Use read signing key for volume server auth + return string(security.GenJwtForFilerServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec)) + }, + resolvedChunks, + 0, + totalSize, + 0, // no throttling + ) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return fmt.Errorf("failed to prepare stream: %v", err) + } + + // Stream directly to response + return streamFn(w) +} + +// setResponseHeaders sets all standard HTTP response headers from entry metadata +func (s3a *S3ApiServer) setResponseHeaders(w http.ResponseWriter, entry *filer_pb.Entry, totalSize int64) { + // Set content length and accept ranges + w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) + w.Header().Set("Accept-Ranges", "bytes") + + // Set ETag + etag := filer.ETag(entry) + if etag != "" { + w.Header().Set("ETag", "\""+etag+"\"") + } + + // Set Last-Modified in RFC1123 format + if entry.Attributes != nil { + modTime := time.Unix(entry.Attributes.Mtime, 0).UTC() + w.Header().Set("Last-Modified", modTime.Format(http.TimeFormat)) + } + + // Set Content-Type + mimeType := "" + if entry.Attributes != nil && entry.Attributes.Mime != "" { + mimeType = entry.Attributes.Mime + } + if mimeType == "" { + // Try to detect from entry name + if entry.Name != "" { + ext := filepath.Ext(entry.Name) + if ext != "" { + mimeType = mime.TypeByExtension(ext) + } } + } + if mimeType != "" { + w.Header().Set("Content-Type", mimeType) + } else { + w.Header().Set("Content-Type", "application/octet-stream") + } - // Add SSE metadata headers based on object metadata before SSE processing - if objectEntryForSSE != nil { - s3a.addSSEHeadersToResponse(proxyResponse, objectEntryForSSE) + // Set custom headers from entry.Extended (user metadata) + if entry.Extended != nil { + for k, v := range entry.Extended { + // Skip internal SeaweedFS headers + if !strings.HasPrefix(k, "xattr-") && !s3_constants.IsSeaweedFSInternalHeader(k) { + w.Header().Set(k, string(v)) + } } + } +} - // Handle SSE decryption (both SSE-C and SSE-KMS) if needed - return s3a.handleSSEResponse(r, proxyResponse, w, objectEntryForSSE) - }) +// simpleMasterClient implements the minimal interface for streaming +type simpleMasterClient struct { + lookupFn func(ctx context.Context, fileId string) ([]string, error) +} + +func (s *simpleMasterClient) GetLookupFileIdFunction() wdclient.LookupFileIdFunctionType { + return s.lookupFn } func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) { diff --git a/weed/s3api/s3api_object_handlers_multipart.go b/weed/s3api/s3api_object_handlers_multipart.go index ef1182fc2..ab03c0591 100644 --- a/weed/s3api/s3api_object_handlers_multipart.go +++ b/weed/s3api/s3api_object_handlers_multipart.go @@ -401,13 +401,16 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ } destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) - etag, errCode, _ := s3a.putToFiler(r, uploadUrl, dataReader, destination, bucket, partID) + etag, errCode, sseType := s3a.putToFiler(r, uploadUrl, dataReader, destination, bucket, partID) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) return } setEtag(w, etag) + + // Set SSE response headers for multipart uploads + s3a.setSSEResponseHeaders(w, r, sseType) writeSuccessResponseEmpty(w, r) diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 6ce48429f..009412150 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -1,25 +1,25 @@ package s3api import ( + "context" "crypto/md5" - "encoding/base64" - "encoding/json" "errors" "fmt" "io" "net/http" + "path/filepath" "strconv" "strings" "time" "github.com/pquerna/cachecontrol/cacheobject" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/security" - weed_server "github.com/seaweedfs/seaweedfs/weed/server" stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/util/constants" ) @@ -159,7 +159,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) case s3_constants.VersioningEnabled: // Handle enabled versioning - create new versions with real version IDs glog.V(0).Infof("PutObjectHandler: ENABLED versioning detected for %s/%s, calling putVersionedObject", bucket, object) - versionId, etag, errCode := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType) + versionId, etag, errCode, sseType := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType) if errCode != s3err.ErrNone { glog.Errorf("PutObjectHandler: putVersionedObject failed with errCode=%v for %s/%s", errCode, bucket, object) s3err.WriteErrorResponse(w, r, errCode) @@ -178,9 +178,13 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) // Set ETag in response setEtag(w, etag) + + // Set SSE response headers for versioned objects + s3a.setSSEResponseHeaders(w, r, sseType) + case s3_constants.VersioningSuspended: // Handle suspended versioning - overwrite with "null" version ID but preserve existing versions - etag, errCode := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType) + etag, errCode, sseType := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) return @@ -191,6 +195,9 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) // Set ETag in response setEtag(w, etag) + + // Set SSE response headers for suspended versioning + s3a.setSSEResponseHeaders(w, r, sseType) default: // Handle regular PUT (never configured versioning) uploadUrl := s3a.toFilerUrl(bucket, object) @@ -209,9 +216,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) setEtag(w, etag) // Set SSE response headers based on encryption type used - if sseType == s3_constants.SSETypeS3 { - w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256) - } + s3a.setSSEResponseHeaders(w, r, sseType) } } stats_collect.RecordBucketActiveTime(bucket) @@ -221,6 +226,9 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) } func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, destination string, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseType string) { + // NEW OPTIMIZATION: Write directly to volume servers, bypassing filer proxy + // This eliminates the filer proxy overhead for PUT operations + // Calculate unique offset for each part to prevent IV reuse in multipart uploads // This is critical for CTR mode encryption security partOffset := calculatePartOffset(partNumber) @@ -270,105 +278,184 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader glog.V(4).Infof("putToFiler: explicit encryption already applied, skipping bucket default encryption") } + // Parse the upload URL to extract the file path + // uploadUrl format: http://filer:8888/path/to/bucket/object + filePath := strings.TrimPrefix(uploadUrl, "http://"+string(s3a.option.Filer)) + + // Calculate MD5 hash hash := md5.New() var body = io.TeeReader(dataReader, hash) - proxyReq, err := http.NewRequest(http.MethodPut, uploadUrl, body) - + // Step 1: Assign volume from filer (via gRPC) + var assignResult *filer_pb.AssignVolumeResponse + err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + collection := "" + if s3a.option.FilerGroup != "" { + collection = s3a.getCollectionName(bucket) + } + resp, err := client.AssignVolume(context.Background(), &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: "", + Collection: collection, + DiskType: "", + DataCenter: s3a.option.DataCenter, + Path: filePath, + }) + if err != nil { + return fmt.Errorf("assign volume: %w", err) + } + if resp.Error != "" { + return fmt.Errorf("assign volume: %v", resp.Error) + } + assignResult = resp + return nil + }) if err != nil { - glog.Errorf("NewRequest %s: %v", uploadUrl, err) + glog.Errorf("putToFiler: failed to assign volume: %v", err) return "", s3err.ErrInternalError, "" } - proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) - if destination != "" { - proxyReq.Header.Set(s3_constants.SeaweedStorageDestinationHeader, destination) + // Step 2: Upload data directly to volume server + volumeUploadUrl := fmt.Sprintf("http://%s/%s", assignResult.Location.Url, assignResult.FileId) + + // Read all data for upload (we need to calculate hash anyway) + data, readErr := io.ReadAll(body) + if readErr != nil { + glog.Errorf("putToFiler: failed to read data: %v", readErr) + return "", s3err.ErrInternalError, "" } - if s3a.option.FilerGroup != "" { - query := proxyReq.URL.Query() - query.Add("collection", s3a.getCollectionName(bucket)) - proxyReq.URL.RawQuery = query.Encode() + // Calculate ETag for S3 API response (hex format) + etag = fmt.Sprintf("%x", hash.Sum(nil)) + + uploadOption := &operation.UploadOption{ + UploadUrl: volumeUploadUrl, + Cipher: false, + IsInputCompressed: false, + MimeType: r.Header.Get("Content-Type"), + PairMap: nil, + Jwt: security.EncodedJwt(assignResult.Auth), + } + + uploader, uploaderErr := operation.NewUploader() + if uploaderErr != nil { + glog.Errorf("putToFiler: failed to create uploader: %v", uploaderErr) + return "", s3err.ErrInternalError, "" } - for header, values := range r.Header { - for _, value := range values { - proxyReq.Header.Add(header, value) + uploadResult, uploadErr := uploader.UploadData(context.Background(), data, uploadOption) + if uploadErr != nil { + glog.Errorf("putToFiler: failed to upload to volume server: %v", uploadErr) + if strings.Contains(uploadErr.Error(), s3err.ErrMsgPayloadChecksumMismatch) { + return "", s3err.ErrInvalidDigest, "" } + return "", s3err.ErrInternalError, "" } - // Log version ID header for debugging - if versionIdHeader := proxyReq.Header.Get(s3_constants.ExtVersionIdKey); versionIdHeader != "" { - glog.V(0).Infof("putToFiler: version ID header set: %s=%s for %s", s3_constants.ExtVersionIdKey, versionIdHeader, uploadUrl) + // Step 3: Create metadata entry + now := time.Now() + mimeType := r.Header.Get("Content-Type") + if mimeType == "" { + mimeType = "application/octet-stream" } - // Set object owner header for filer to extract + // Create file chunk + fid, fidErr := filer_pb.ToFileIdObject(assignResult.FileId) + if fidErr != nil { + glog.Errorf("putToFiler: failed to parse file ID: %v", fidErr) + return "", s3err.ErrInternalError, "" + } + + // IMPORTANT: FileChunk.ETag must be base64-encoded (from uploadResult.ContentMd5) + // NOT hex-encoded etag! The filer.ETagChunks() function expects base64. + fileChunk := &filer_pb.FileChunk{ + FileId: assignResult.FileId, + Offset: 0, + Size: uint64(uploadResult.Size), + ETag: uploadResult.ContentMd5, // Base64-encoded MD5 from volume server + Fid: fid, + CipherKey: uploadResult.CipherKey, + } + + // Create entry + entry := &filer_pb.Entry{ + Name: filepath.Base(filePath), + IsDirectory: false, + Attributes: &filer_pb.FuseAttributes{ + Crtime: now.Unix(), + Mtime: now.Unix(), + FileMode: 0660, + Uid: 0, + Gid: 0, + Mime: mimeType, + FileSize: uint64(uploadResult.Size), + }, + Chunks: []*filer_pb.FileChunk{fileChunk}, + Extended: make(map[string][]byte), + } + + // Set object owner amzAccountId := r.Header.Get(s3_constants.AmzAccountId) if amzAccountId != "" { - proxyReq.Header.Set(s3_constants.ExtAmzOwnerKey, amzAccountId) - glog.V(2).Infof("putToFiler: setting owner header %s for object %s", amzAccountId, uploadUrl) + entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId) + glog.V(2).Infof("putToFiler: setting owner %s for object %s", amzAccountId, filePath) } - // Set SSE-C metadata headers for the filer if encryption was applied - if customerKey != nil && len(sseIV) > 0 { - proxyReq.Header.Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, "AES256") - proxyReq.Header.Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, customerKey.KeyMD5) - // Store IV in a custom header that the filer can use to store in entry metadata - proxyReq.Header.Set(s3_constants.SeaweedFSSSEIVHeader, base64.StdEncoding.EncodeToString(sseIV)) + // Set version ID if present + if versionIdHeader := r.Header.Get(s3_constants.ExtVersionIdKey); versionIdHeader != "" { + entry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionIdHeader) + glog.V(0).Infof("putToFiler: setting version ID %s for object %s", versionIdHeader, filePath) } - // Set SSE-KMS metadata headers for the filer if KMS encryption was applied - if sseKMSKey != nil { - // Use already-serialized SSE-KMS metadata from helper function - // Store serialized KMS metadata in a custom header that the filer can use - proxyReq.Header.Set(s3_constants.SeaweedFSSSEKMSKeyHeader, base64.StdEncoding.EncodeToString(sseKMSMetadata)) + // Set TTL-based S3 expiry + entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true") - glog.V(3).Infof("putToFiler: storing SSE-KMS metadata for object %s with keyID %s", uploadUrl, sseKMSKey.KeyID) - } else { - glog.V(4).Infof("putToFiler: no SSE-KMS encryption detected") - } - - // Set SSE-S3 metadata headers for the filer if S3 encryption was applied - if sseS3Key != nil && len(sseS3Metadata) > 0 { - // Store serialized S3 metadata in a custom header that the filer can use - proxyReq.Header.Set(s3_constants.SeaweedFSSSES3Key, base64.StdEncoding.EncodeToString(sseS3Metadata)) - glog.V(3).Infof("putToFiler: storing SSE-S3 metadata for object %s with keyID %s", uploadUrl, sseS3Key.KeyID) - } - // Set TTL-based S3 expiry (modification time) - proxyReq.Header.Set(s3_constants.SeaweedFSExpiresS3, "true") - // ensure that the Authorization header is overriding any previous - // Authorization header which might be already present in proxyReq - s3a.maybeAddFilerJwtAuthorization(proxyReq, true) - resp, postErr := s3a.client.Do(proxyReq) - - if postErr != nil { - glog.Errorf("post to filer: %v", postErr) - if strings.Contains(postErr.Error(), s3err.ErrMsgPayloadChecksumMismatch) { - return "", s3err.ErrInvalidDigest, "" + // Copy user metadata and standard headers + for k, v := range r.Header { + if len(v) > 0 && len(v[0]) > 0 { + if strings.HasPrefix(k, "X-Amz-Meta-") || k == "Cache-Control" || k == "Expires" || k == "Content-Disposition" { + entry.Extended[k] = []byte(v[0]) + } + if k == "Response-Content-Disposition" { + entry.Extended["Content-Disposition"] = []byte(v[0]) + } } - return "", s3err.ErrInternalError, "" } - defer resp.Body.Close() - etag = fmt.Sprintf("%x", hash.Sum(nil)) + // Set SSE-C metadata + if customerKey != nil && len(sseIV) > 0 { + entry.Extended[s3_constants.SeaweedFSSSEIV] = sseIV + entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte("AES256") + entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(customerKey.KeyMD5) + } - resp_body, ra_err := io.ReadAll(resp.Body) - if ra_err != nil { - glog.Errorf("upload to filer response read %d: %v", resp.StatusCode, ra_err) - return etag, s3err.ErrInternalError, "" + // Set SSE-KMS metadata + if sseKMSKey != nil { + entry.Extended[s3_constants.SeaweedFSSSEKMSKeyHeader] = sseKMSMetadata + glog.V(3).Infof("putToFiler: storing SSE-KMS metadata for object %s with keyID %s", filePath, sseKMSKey.KeyID) } - var ret weed_server.FilerPostResult - unmarshal_err := json.Unmarshal(resp_body, &ret) - if unmarshal_err != nil { - glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body)) - return "", s3err.ErrInternalError, "" + + // Set SSE-S3 metadata + if sseS3Key != nil && len(sseS3Metadata) > 0 { + entry.Extended[s3_constants.SeaweedFSSSES3Key] = sseS3Metadata + glog.V(3).Infof("putToFiler: storing SSE-S3 metadata for object %s with keyID %s", filePath, sseS3Key.KeyID) } - if ret.Error != "" { - glog.Errorf("upload to filer error: %v", ret.Error) - return "", filerErrorToS3Error(ret.Error), "" + + // Step 4: Save metadata to filer via gRPC + createErr := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + req := &filer_pb.CreateEntryRequest{ + Directory: filepath.Dir(filePath), + Entry: entry, + } + _, err := client.CreateEntry(context.Background(), req) + return err + }) + if createErr != nil { + glog.Errorf("putToFiler: failed to create entry: %v", createErr) + return "", filerErrorToS3Error(createErr.Error()), "" } - BucketTrafficReceived(ret.Size, r) + BucketTrafficReceived(int64(uploadResult.Size), r) // Return the SSE type determined by the unified handler return etag, s3err.ErrNone, sseResult.SSEType @@ -384,6 +471,34 @@ func setEtag(w http.ResponseWriter, etag string) { } } +// setSSEResponseHeaders sets appropriate SSE response headers based on encryption type +func (s3a *S3ApiServer) setSSEResponseHeaders(w http.ResponseWriter, r *http.Request, sseType string) { + switch sseType { + case s3_constants.SSETypeS3: + // SSE-S3: Return the encryption algorithm + w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256) + + case s3_constants.SSETypeC: + // SSE-C: Echo back the customer-provided algorithm and key MD5 + if algo := r.Header.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm); algo != "" { + w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, algo) + } + if keyMD5 := r.Header.Get(s3_constants.AmzServerSideEncryptionCustomerKeyMD5); keyMD5 != "" { + w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, keyMD5) + } + + case s3_constants.SSETypeKMS: + // SSE-KMS: Return the KMS key ID and algorithm + w.Header().Set(s3_constants.AmzServerSideEncryption, "aws:kms") + if keyID := r.Header.Get(s3_constants.AmzServerSideEncryptionAwsKmsKeyId); keyID != "" { + w.Header().Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, keyID) + } + if bucketKeyEnabled := r.Header.Get(s3_constants.AmzServerSideEncryptionBucketKeyEnabled); bucketKeyEnabled == "true" { + w.Header().Set(s3_constants.AmzServerSideEncryptionBucketKeyEnabled, "true") + } + } +} + func filerErrorToS3Error(errString string) s3err.ErrorCode { switch { case errString == constants.ErrMsgBadDigest: @@ -446,7 +561,7 @@ func (s3a *S3ApiServer) setObjectOwnerFromRequest(r *http.Request, entry *filer_ // // For suspended versioning, objects are stored as regular files (version ID "null") in the bucket directory, // while existing versions from when versioning was enabled remain preserved in the .versions subdirectory. -func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode) { +func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode, sseType string) { // Normalize object path to ensure consistency with toFilerUrl behavior normalizedObject := removeDuplicateSlashes(object) @@ -528,7 +643,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob parsedTime, err := time.Parse(time.RFC3339, explicitRetainUntilDate) if err != nil { glog.Errorf("putSuspendedVersioningObject: failed to parse retention until date: %v", err) - return "", s3err.ErrInvalidRequest + return "", s3err.ErrInvalidRequest, "" } r.Header.Set(s3_constants.ExtRetentionUntilDateKey, strconv.FormatInt(parsedTime.Unix(), 10)) glog.V(2).Infof("putSuspendedVersioningObject: setting retention until date header (timestamp: %d)", parsedTime.Unix()) @@ -540,7 +655,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob glog.V(2).Infof("putSuspendedVersioningObject: setting legal hold header: %s", legalHold) } else { glog.Errorf("putSuspendedVersioningObject: invalid legal hold value: %s", legalHold) - return "", s3err.ErrInvalidRequest + return "", s3err.ErrInvalidRequest, "" } } @@ -565,10 +680,10 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob if isTestObj { glog.V(0).Infof("=== TESTOBJBAR: calling putToFiler ===") } - etag, errCode, _ = s3a.putToFiler(r, uploadUrl, body, "", bucket, 1) + etag, errCode, sseType = s3a.putToFiler(r, uploadUrl, body, "", bucket, 1) if errCode != s3err.ErrNone { glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode) - return "", errCode + return "", errCode, "" } if isTestObj { glog.V(0).Infof("=== TESTOBJBAR: putToFiler completed, etag=%s ===", etag) @@ -612,7 +727,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob if isTestObj { glog.V(0).Infof("=== TESTOBJBAR: putSuspendedVersioningObject COMPLETED ===") } - return etag, s3err.ErrNone + return etag, s3err.ErrNone, sseType } // updateIsLatestFlagsForSuspendedVersioning sets IsLatest=false on all existing versions/delete markers @@ -684,7 +799,7 @@ func (s3a *S3ApiServer) updateIsLatestFlagsForSuspendedVersioning(bucket, object return nil } -func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode) { +func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode, sseType string) { // Generate version ID versionId = generateVersionId() @@ -709,7 +824,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin }) if err != nil { glog.Errorf("putVersionedObject: failed to create .versions directory: %v", err) - return "", "", s3err.ErrInternalError + return "", "", s3err.ErrInternalError, "" } hash := md5.New() @@ -720,10 +835,10 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionUploadUrl) - etag, errCode, _ = s3a.putToFiler(r, versionUploadUrl, body, "", bucket, 1) + etag, errCode, sseType = s3a.putToFiler(r, versionUploadUrl, body, "", bucket, 1) if errCode != s3err.ErrNone { glog.Errorf("putVersionedObject: failed to upload version: %v", errCode) - return "", "", errCode + return "", "", errCode, "" } // Get the uploaded entry to add versioning metadata @@ -745,7 +860,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin if err != nil { glog.Errorf("putVersionedObject: failed to get version entry after %d attempts: %v", maxRetries, err) - return "", "", s3err.ErrInternalError + return "", "", s3err.ErrInternalError, "" } // Add versioning metadata to this version @@ -766,7 +881,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin // Extract and store object lock metadata from request headers if err := s3a.extractObjectLockMetadataFromRequest(r, versionEntry); err != nil { glog.Errorf("putVersionedObject: failed to extract object lock metadata: %v", err) - return "", "", s3err.ErrInvalidRequest + return "", "", s3err.ErrInvalidRequest, "" } // Update the version entry with metadata @@ -777,17 +892,17 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin }) if err != nil { glog.Errorf("putVersionedObject: failed to update version metadata: %v", err) - return "", "", s3err.ErrInternalError + return "", "", s3err.ErrInternalError, "" } // Update the .versions directory metadata to indicate this is the latest version err = s3a.updateLatestVersionInDirectory(bucket, normalizedObject, versionId, versionFileName) if err != nil { glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err) - return "", "", s3err.ErrInternalError + return "", "", s3err.ErrInternalError, "" } glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s (normalized: %s)", versionId, bucket, object, normalizedObject) - return versionId, etag, s3err.ErrNone + return versionId, etag, s3err.ErrNone, sseType } // updateLatestVersionInDirectory updates the .versions directory metadata to indicate the latest version