Browse Source

Revert "handling directory"

This reverts commit 3a335f0ac3.
pull/7481/head
chrislu 2 weeks ago
parent
commit
abfb67ac74
  1. 2326
      weed/s3api/s3api_object_handlers.go
  2. 8
      weed/s3api/s3api_object_handlers_multipart.go
  3. 6
      weed/s3api/s3api_object_handlers_postpolicy.go
  4. 478
      weed/s3api/s3api_object_handlers_put.go

2326
weed/s3api/s3api_object_handlers.go
File diff suppressed because it is too large
View File

8
weed/s3api/s3api_object_handlers_multipart.go

@ -403,7 +403,7 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
glog.V(2).Infof("PutObjectPart: bucket=%s, object=%s, uploadId=%s, partNumber=%d, size=%d", glog.V(2).Infof("PutObjectPart: bucket=%s, object=%s, uploadId=%s, partNumber=%d, size=%d",
bucket, object, uploadID, partID, r.ContentLength) bucket, object, uploadID, partID, r.ContentLength)
etag, errCode, sseType := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket, partID)
etag, errCode, sseMetadata := s3a.putToFiler(r, uploadUrl, dataReader, bucket, partID)
if errCode != s3err.ErrNone { if errCode != s3err.ErrNone {
glog.Errorf("PutObjectPart: putToFiler failed with error code %v for bucket=%s, object=%s, partNumber=%d", glog.Errorf("PutObjectPart: putToFiler failed with error code %v for bucket=%s, object=%s, partNumber=%d",
errCode, bucket, object, partID) errCode, bucket, object, partID)
@ -412,14 +412,12 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
} }
glog.V(2).Infof("PutObjectPart: SUCCESS - bucket=%s, object=%s, partNumber=%d, etag=%s, sseType=%s", glog.V(2).Infof("PutObjectPart: SUCCESS - bucket=%s, object=%s, partNumber=%d, etag=%s, sseType=%s",
bucket, object, partID, etag, sseType)
bucket, object, partID, etag, sseMetadata.SSEType)
setEtag(w, etag) setEtag(w, etag)
// Set SSE response headers for multipart uploads // Set SSE response headers for multipart uploads
if sseType == s3_constants.SSETypeS3 {
w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256)
}
s3a.setSSEResponseHeaders(w, r, sseMetadata)
writeSuccessResponseEmpty(w, r) writeSuccessResponseEmpty(w, r)

6
weed/s3api/s3api_object_handlers_postpolicy.go

@ -136,7 +136,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R
} }
} }
etag, errCode, sseType := s3a.putToFiler(r, uploadUrl, fileBody, "", bucket, 1)
etag, errCode, sseMetadata := s3a.putToFiler(r, uploadUrl, fileBody, bucket, 1)
if errCode != s3err.ErrNone { if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode) s3err.WriteErrorResponse(w, r, errCode)
@ -153,9 +153,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R
setEtag(w, etag) setEtag(w, etag)
// Include SSE response headers (important for bucket-default encryption) // Include SSE response headers (important for bucket-default encryption)
if sseType == s3_constants.SSETypeS3 {
w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256)
}
s3a.setSSEResponseHeaders(w, r, sseMetadata)
// Decide what http response to send depending on success_action_status parameter // Decide what http response to send depending on success_action_status parameter
switch successStatus { switch successStatus {

478
weed/s3api/s3api_object_handlers_put.go

@ -1,6 +1,7 @@
package s3api package s3api
import ( import (
"context"
"crypto/md5" "crypto/md5"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
@ -8,18 +9,21 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"net/url"
"path/filepath"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/pquerna/cachecontrol/cacheobject" "github.com/pquerna/cachecontrol/cacheobject"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/security"
weed_server "github.com/seaweedfs/seaweedfs/weed/server"
stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util/constants" "github.com/seaweedfs/seaweedfs/weed/util/constants"
) )
@ -60,6 +64,13 @@ type BucketDefaultEncryptionResult struct {
SSEKMSKey *SSEKMSKey SSEKMSKey *SSEKMSKey
} }
// SSEResponseMetadata holds encryption metadata needed for HTTP response headers
type SSEResponseMetadata struct {
SSEType string
KMSKeyID string
BucketKeyEnabled bool
}
func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) { func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
// http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html // http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
@ -135,7 +146,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
versioningEnabled := (versioningState == s3_constants.VersioningEnabled) versioningEnabled := (versioningState == s3_constants.VersioningEnabled)
versioningConfigured := (versioningState != "") versioningConfigured := (versioningState != "")
glog.V(2).Infof("PutObjectHandler: bucket=%s, object=%s, versioningState='%s', versioningEnabled=%v, versioningConfigured=%v", bucket, object, versioningState, versioningEnabled, versioningConfigured)
glog.V(3).Infof("PutObjectHandler: bucket=%s, object=%s, versioningState='%s', versioningEnabled=%v, versioningConfigured=%v", bucket, object, versioningState, versioningEnabled, versioningConfigured)
// Validate object lock headers before processing // Validate object lock headers before processing
if err := s3a.validateObjectLockHeaders(r, versioningEnabled); err != nil { if err := s3a.validateObjectLockHeaders(r, versioningEnabled); err != nil {
@ -158,29 +169,34 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
switch versioningState { switch versioningState {
case s3_constants.VersioningEnabled: case s3_constants.VersioningEnabled:
// Handle enabled versioning - create new versions with real version IDs // Handle enabled versioning - create new versions with real version IDs
glog.V(0).Infof("PutObjectHandler: ENABLED versioning detected for %s/%s, calling putVersionedObject", bucket, object)
versionId, etag, errCode := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType)
glog.V(3).Infof("PutObjectHandler: ENABLED versioning detected for %s/%s, calling putVersionedObject", bucket, object)
versionId, etag, errCode, sseMetadata := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType)
if errCode != s3err.ErrNone { if errCode != s3err.ErrNone {
glog.Errorf("PutObjectHandler: putVersionedObject failed with errCode=%v for %s/%s", errCode, bucket, object) glog.Errorf("PutObjectHandler: putVersionedObject failed with errCode=%v for %s/%s", errCode, bucket, object)
s3err.WriteErrorResponse(w, r, errCode) s3err.WriteErrorResponse(w, r, errCode)
return return
} }
glog.V(0).Infof("PutObjectHandler: putVersionedObject returned versionId=%s, etag=%s for %s/%s", versionId, etag, bucket, object)
glog.V(3).Infof("PutObjectHandler: putVersionedObject returned versionId=%s, etag=%s for %s/%s", versionId, etag, bucket, object)
// Set version ID in response header // Set version ID in response header
if versionId != "" { if versionId != "" {
w.Header().Set("x-amz-version-id", versionId) w.Header().Set("x-amz-version-id", versionId)
glog.V(0).Infof("PutObjectHandler: set x-amz-version-id header to %s for %s/%s", versionId, bucket, object)
glog.V(3).Infof("PutObjectHandler: set x-amz-version-id header to %s for %s/%s", versionId, bucket, object)
} else { } else {
glog.Errorf("PutObjectHandler: CRITICAL - versionId is EMPTY for versioned bucket %s, object %s", bucket, object) glog.Errorf("PutObjectHandler: CRITICAL - versionId is EMPTY for versioned bucket %s, object %s", bucket, object)
} }
// Set ETag in response // Set ETag in response
setEtag(w, etag) setEtag(w, etag)
// Set SSE response headers for versioned objects
s3a.setSSEResponseHeaders(w, r, sseMetadata)
case s3_constants.VersioningSuspended: case s3_constants.VersioningSuspended:
// Handle suspended versioning - overwrite with "null" version ID but preserve existing versions // Handle suspended versioning - overwrite with "null" version ID but preserve existing versions
etag, errCode := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType)
glog.V(3).Infof("PutObjectHandler: SUSPENDED versioning detected for %s/%s, calling putSuspendedVersioningObject", bucket, object)
etag, errCode, sseMetadata := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType)
if errCode != s3err.ErrNone { if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode) s3err.WriteErrorResponse(w, r, errCode)
return return
@ -191,6 +207,9 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
// Set ETag in response // Set ETag in response
setEtag(w, etag) setEtag(w, etag)
// Set SSE response headers for suspended versioning
s3a.setSSEResponseHeaders(w, r, sseMetadata)
default: default:
// Handle regular PUT (never configured versioning) // Handle regular PUT (never configured versioning)
uploadUrl := s3a.toFilerUrl(bucket, object) uploadUrl := s3a.toFilerUrl(bucket, object)
@ -198,7 +217,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
dataReader = mimeDetect(r, dataReader) dataReader = mimeDetect(r, dataReader)
} }
etag, errCode, sseType := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket, 1)
etag, errCode, sseMetadata := s3a.putToFiler(r, uploadUrl, dataReader, bucket, 1)
if errCode != s3err.ErrNone { if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode) s3err.WriteErrorResponse(w, r, errCode)
@ -209,9 +228,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
setEtag(w, etag) setEtag(w, etag)
// Set SSE response headers based on encryption type used // Set SSE response headers based on encryption type used
if sseType == s3_constants.SSETypeS3 {
w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256)
}
s3a.setSSEResponseHeaders(w, r, sseMetadata)
} }
} }
stats_collect.RecordBucketActiveTime(bucket) stats_collect.RecordBucketActiveTime(bucket)
@ -220,15 +237,18 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
writeSuccessResponseEmpty(w, r) writeSuccessResponseEmpty(w, r)
} }
func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, destination string, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseType string) {
// Calculate unique offset for each part to prevent IV reuse in multipart uploads
// This is critical for CTR mode encryption security
partOffset := calculatePartOffset(partNumber)
func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseMetadata SSEResponseMetadata) {
// NEW OPTIMIZATION: Write directly to volume servers, bypassing filer proxy
// This eliminates the filer proxy overhead for PUT operations
// Handle all SSE encryption types in a unified manner to eliminate repetitive dataReader assignments
// For SSE, encrypt with offset=0 for all parts
// Each part is encrypted independently, then decrypted using metadata during GET
partOffset := int64(0)
// Handle all SSE encryption types in a unified manner
sseResult, sseErrorCode := s3a.handleAllSSEEncryption(r, dataReader, partOffset) sseResult, sseErrorCode := s3a.handleAllSSEEncryption(r, dataReader, partOffset)
if sseErrorCode != s3err.ErrNone { if sseErrorCode != s3err.ErrNone {
return "", sseErrorCode, ""
return "", sseErrorCode, SSEResponseMetadata{}
} }
// Extract results from unified SSE handling // Extract results from unified SSE handling
@ -249,7 +269,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
encryptionResult, applyErr := s3a.applyBucketDefaultEncryption(bucket, r, dataReader) encryptionResult, applyErr := s3a.applyBucketDefaultEncryption(bucket, r, dataReader)
if applyErr != nil { if applyErr != nil {
glog.Errorf("Failed to apply bucket default encryption: %v", applyErr) glog.Errorf("Failed to apply bucket default encryption: %v", applyErr)
return "", s3err.ErrInternalError, ""
return "", s3err.ErrInternalError, SSEResponseMetadata{}
} }
// Update variables based on the result // Update variables based on the result
@ -263,115 +283,289 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
sseS3Metadata, metaErr = SerializeSSES3Metadata(sseS3Key) sseS3Metadata, metaErr = SerializeSSES3Metadata(sseS3Key)
if metaErr != nil { if metaErr != nil {
glog.Errorf("Failed to serialize SSE-S3 metadata for bucket default encryption: %v", metaErr) glog.Errorf("Failed to serialize SSE-S3 metadata for bucket default encryption: %v", metaErr)
return "", s3err.ErrInternalError, ""
return "", s3err.ErrInternalError, SSEResponseMetadata{}
} }
} }
} else { } else {
glog.V(4).Infof("putToFiler: explicit encryption already applied, skipping bucket default encryption") glog.V(4).Infof("putToFiler: explicit encryption already applied, skipping bucket default encryption")
} }
hash := md5.New()
var body = io.TeeReader(dataReader, hash)
// Parse the upload URL to extract the file path
// uploadUrl format: http://filer:8888/path/to/bucket/object (or https://, IPv6, etc.)
// Use proper URL parsing instead of string manipulation for robustness
parsedUrl, parseErr := url.Parse(uploadUrl)
if parseErr != nil {
glog.Errorf("putToFiler: failed to parse uploadUrl %q: %v", uploadUrl, parseErr)
return "", s3err.ErrInternalError, SSEResponseMetadata{}
}
// Use parsedUrl.Path directly - it's already decoded by url.Parse()
// Per Go documentation: "Path is stored in decoded form: /%47%6f%2f becomes /Go/"
// Calling PathUnescape again would double-decode and fail on keys like "b%ar"
filePath := parsedUrl.Path
// Step 1 & 2: Use auto-chunking to handle large files without OOM
// This splits large uploads into 8MB chunks, preventing memory issues on both S3 API and volume servers
const chunkSize = 8 * 1024 * 1024 // 8MB chunks (S3 standard)
const smallFileLimit = 256 * 1024 // 256KB - store inline in filer
proxyReq, err := http.NewRequest(http.MethodPut, uploadUrl, body)
collection := ""
if s3a.option.FilerGroup != "" {
collection = s3a.getCollectionName(bucket)
}
// Create assign function for chunked upload
assignFunc := func(ctx context.Context, count int) (*operation.VolumeAssignRequest, *operation.AssignResult, error) {
var assignResult *filer_pb.AssignVolumeResponse
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.AssignVolume(ctx, &filer_pb.AssignVolumeRequest{
Count: int32(count),
Replication: "",
Collection: collection,
DiskType: "",
DataCenter: s3a.option.DataCenter,
Path: filePath,
})
if err != nil {
return fmt.Errorf("assign volume: %w", err)
}
if resp.Error != "" {
return fmt.Errorf("assign volume: %v", resp.Error)
}
assignResult = resp
return nil
})
if err != nil {
return nil, nil, err
}
// Convert filer_pb.AssignVolumeResponse to operation.AssignResult
return nil, &operation.AssignResult{
Fid: assignResult.FileId,
Url: assignResult.Location.Url,
PublicUrl: assignResult.Location.PublicUrl,
Count: uint64(count),
Auth: security.EncodedJwt(assignResult.Auth),
}, nil
}
// Upload with auto-chunking
// Use context.Background() to ensure chunk uploads complete even if HTTP request is cancelled
// This prevents partial uploads and data corruption
chunkResult, err := operation.UploadReaderInChunks(context.Background(), dataReader, &operation.ChunkedUploadOption{
ChunkSize: chunkSize,
SmallFileLimit: smallFileLimit,
Collection: collection,
DataCenter: s3a.option.DataCenter,
SaveSmallInline: false, // S3 API always creates chunks, never stores inline
MimeType: r.Header.Get("Content-Type"),
AssignFunc: assignFunc,
})
if err != nil { if err != nil {
glog.Errorf("NewRequest %s: %v", uploadUrl, err)
return "", s3err.ErrInternalError, ""
glog.Errorf("putToFiler: chunked upload failed: %v", err)
if strings.Contains(err.Error(), s3err.ErrMsgPayloadChecksumMismatch) {
return "", s3err.ErrInvalidDigest, SSEResponseMetadata{}
}
return "", s3err.ErrInternalError, SSEResponseMetadata{}
} }
proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
if destination != "" {
proxyReq.Header.Set(s3_constants.SeaweedStorageDestinationHeader, destination)
}
// Step 3: Calculate MD5 hash and add SSE metadata to chunks
md5Sum := chunkResult.Md5Hash.Sum(nil)
if s3a.option.FilerGroup != "" {
query := proxyReq.URL.Query()
query.Add("collection", s3a.getCollectionName(bucket))
proxyReq.URL.RawQuery = query.Encode()
}
glog.V(4).Infof("putToFiler: Chunked upload SUCCESS - path=%s, chunks=%d, size=%d",
filePath, len(chunkResult.FileChunks), chunkResult.TotalSize)
for header, values := range r.Header {
for _, value := range values {
proxyReq.Header.Add(header, value)
// Log chunk details for debugging (verbose only - high frequency)
if glog.V(4) {
for i, chunk := range chunkResult.FileChunks {
glog.Infof(" PUT Chunk[%d]: fid=%s, offset=%d, size=%d", i, chunk.GetFileIdString(), chunk.Offset, chunk.Size)
} }
} }
// Log version ID header for debugging
if versionIdHeader := proxyReq.Header.Get(s3_constants.ExtVersionIdKey); versionIdHeader != "" {
glog.V(0).Infof("putToFiler: version ID header set: %s=%s for %s", s3_constants.ExtVersionIdKey, versionIdHeader, uploadUrl)
// Add SSE metadata to all chunks if present
if customerKey != nil {
// SSE-C: Create per-chunk metadata (matches filer logic)
for _, chunk := range chunkResult.FileChunks {
chunk.SseType = filer_pb.SSEType_SSE_C
if len(sseIV) > 0 {
// PartOffset tracks position within the encrypted stream
// Since ALL uploads (single-part and multipart parts) encrypt starting from offset 0,
// PartOffset = chunk.Offset represents where this chunk is in that encrypted stream
// - Single-part: chunk.Offset is position in the file's encrypted stream
// - Multipart: chunk.Offset is position in this part's encrypted stream
ssecMetadataStruct := struct {
Algorithm string `json:"algorithm"`
IV string `json:"iv"`
KeyMD5 string `json:"keyMD5"`
PartOffset int64 `json:"partOffset"`
}{
Algorithm: "AES256",
IV: base64.StdEncoding.EncodeToString(sseIV),
KeyMD5: customerKey.KeyMD5,
PartOffset: chunk.Offset, // Position within the encrypted stream (always encrypted from 0)
}
if ssecMetadata, serErr := json.Marshal(ssecMetadataStruct); serErr == nil {
chunk.SseMetadata = ssecMetadata
}
}
}
} else if sseKMSKey != nil {
// SSE-KMS: Store serialized metadata in all chunks
for _, chunk := range chunkResult.FileChunks {
chunk.SseType = filer_pb.SSEType_SSE_KMS
chunk.SseMetadata = sseKMSMetadata
}
} else if sseS3Key != nil {
// SSE-S3: Store serialized metadata in all chunks
for _, chunk := range chunkResult.FileChunks {
chunk.SseType = filer_pb.SSEType_SSE_S3
chunk.SseMetadata = sseS3Metadata
}
} }
// Set object owner header for filer to extract
// Step 4: Create metadata entry
now := time.Now()
mimeType := r.Header.Get("Content-Type")
if mimeType == "" {
mimeType = "application/octet-stream"
}
// Create entry
entry := &filer_pb.Entry{
Name: filepath.Base(filePath),
IsDirectory: false,
Attributes: &filer_pb.FuseAttributes{
Crtime: now.Unix(),
Mtime: now.Unix(),
FileMode: 0660,
Uid: 0,
Gid: 0,
Mime: mimeType,
FileSize: uint64(chunkResult.TotalSize),
},
Chunks: chunkResult.FileChunks, // All chunks from auto-chunking
Extended: make(map[string][]byte),
}
// Set Md5 attribute based on context:
// 1. For multipart upload PARTS (stored in .uploads/ directory): ALWAYS set Md5
// - Parts must use simple MD5 ETags, never composite format
// - Even if a part has multiple chunks internally, its ETag is MD5 of entire part
// 2. For regular object uploads: only set Md5 for single-chunk uploads
// - Multi-chunk regular objects use composite "md5-count" format
isMultipartPart := strings.Contains(uploadUrl, "/.uploads/")
if isMultipartPart || len(chunkResult.FileChunks) == 1 {
entry.Attributes.Md5 = md5Sum
}
// Calculate ETag using the same logic as GET to ensure consistency
// For single chunk: uses entry.Attributes.Md5
// For multiple chunks: uses filer.ETagChunks() which returns "<hash>-<count>"
etag = filer.ETag(entry)
glog.V(4).Infof("putToFiler: Calculated ETag=%s for %d chunks", etag, len(chunkResult.FileChunks))
// Set object owner
amzAccountId := r.Header.Get(s3_constants.AmzAccountId) amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" { if amzAccountId != "" {
proxyReq.Header.Set(s3_constants.ExtAmzOwnerKey, amzAccountId)
glog.V(2).Infof("putToFiler: setting owner header %s for object %s", amzAccountId, uploadUrl)
entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
glog.V(2).Infof("putToFiler: setting owner %s for object %s", amzAccountId, filePath)
}
// Set version ID if present
if versionIdHeader := r.Header.Get(s3_constants.ExtVersionIdKey); versionIdHeader != "" {
entry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionIdHeader)
glog.V(3).Infof("putToFiler: setting version ID %s for object %s", versionIdHeader, filePath)
} }
// Set SSE-C metadata headers for the filer if encryption was applied
// Set TTL-based S3 expiry
entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true")
// Copy user metadata and standard headers
for k, v := range r.Header {
if len(v) > 0 && len(v[0]) > 0 {
if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) {
// Go's HTTP server canonicalizes headers (e.g., x-amz-meta-foo → X-Amz-Meta-Foo)
// We store them as they come in (after canonicalization) to preserve the user's intent
entry.Extended[k] = []byte(v[0])
} else if k == "Cache-Control" || k == "Expires" || k == "Content-Disposition" {
entry.Extended[k] = []byte(v[0])
}
if k == "Response-Content-Disposition" {
entry.Extended["Content-Disposition"] = []byte(v[0])
}
}
}
// Set SSE-C metadata
if customerKey != nil && len(sseIV) > 0 { if customerKey != nil && len(sseIV) > 0 {
proxyReq.Header.Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, "AES256")
proxyReq.Header.Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, customerKey.KeyMD5)
// Store IV in a custom header that the filer can use to store in entry metadata
proxyReq.Header.Set(s3_constants.SeaweedFSSSEIVHeader, base64.StdEncoding.EncodeToString(sseIV))
// Store IV as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes)
entry.Extended[s3_constants.SeaweedFSSSEIV] = sseIV
entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte("AES256")
entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(customerKey.KeyMD5)
glog.V(3).Infof("putToFiler: storing SSE-C metadata - IV len=%d", len(sseIV))
} }
// Set SSE-KMS metadata headers for the filer if KMS encryption was applied
// Set SSE-KMS metadata
if sseKMSKey != nil { if sseKMSKey != nil {
// Use already-serialized SSE-KMS metadata from helper function
// Store serialized KMS metadata in a custom header that the filer can use
proxyReq.Header.Set(s3_constants.SeaweedFSSSEKMSKeyHeader, base64.StdEncoding.EncodeToString(sseKMSMetadata))
glog.V(3).Infof("putToFiler: storing SSE-KMS metadata for object %s with keyID %s", uploadUrl, sseKMSKey.KeyID)
} else {
glog.V(4).Infof("putToFiler: no SSE-KMS encryption detected")
// Store metadata as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes)
entry.Extended[s3_constants.SeaweedFSSSEKMSKey] = sseKMSMetadata
// Set standard SSE headers for detection
entry.Extended[s3_constants.AmzServerSideEncryption] = []byte("aws:kms")
entry.Extended[s3_constants.AmzServerSideEncryptionAwsKmsKeyId] = []byte(sseKMSKey.KeyID)
glog.V(3).Infof("putToFiler: storing SSE-KMS metadata - keyID=%s, raw len=%d", sseKMSKey.KeyID, len(sseKMSMetadata))
} }
// Set SSE-S3 metadata headers for the filer if S3 encryption was applied
// Set SSE-S3 metadata
if sseS3Key != nil && len(sseS3Metadata) > 0 { if sseS3Key != nil && len(sseS3Metadata) > 0 {
// Store serialized S3 metadata in a custom header that the filer can use
proxyReq.Header.Set(s3_constants.SeaweedFSSSES3Key, base64.StdEncoding.EncodeToString(sseS3Metadata))
glog.V(3).Infof("putToFiler: storing SSE-S3 metadata for object %s with keyID %s", uploadUrl, sseS3Key.KeyID)
}
// Set TTL-based S3 expiry (modification time)
proxyReq.Header.Set(s3_constants.SeaweedFSExpiresS3, "true")
// ensure that the Authorization header is overriding any previous
// Authorization header which might be already present in proxyReq
s3a.maybeAddFilerJwtAuthorization(proxyReq, true)
resp, postErr := s3a.client.Do(proxyReq)
if postErr != nil {
glog.Errorf("post to filer: %v", postErr)
if strings.Contains(postErr.Error(), s3err.ErrMsgPayloadChecksumMismatch) {
return "", s3err.ErrInvalidDigest, ""
// Store metadata as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes)
entry.Extended[s3_constants.SeaweedFSSSES3Key] = sseS3Metadata
// Set standard SSE header for detection
entry.Extended[s3_constants.AmzServerSideEncryption] = []byte("AES256")
glog.V(3).Infof("putToFiler: storing SSE-S3 metadata - keyID=%s, raw len=%d", sseS3Key.KeyID, len(sseS3Metadata))
}
// Step 4: Save metadata to filer via gRPC
// Use context.Background() to ensure metadata save completes even if HTTP request is cancelled
// This matches the chunk upload behavior and prevents orphaned chunks
glog.V(3).Infof("putToFiler: About to create entry - dir=%s, name=%s, chunks=%d, extended keys=%d",
filepath.Dir(filePath), filepath.Base(filePath), len(entry.Chunks), len(entry.Extended))
createErr := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
req := &filer_pb.CreateEntryRequest{
Directory: filepath.Dir(filePath),
Entry: entry,
} }
return "", s3err.ErrInternalError, ""
glog.V(3).Infof("putToFiler: Calling CreateEntry for %s", filePath)
_, err := client.CreateEntry(context.Background(), req)
if err != nil {
glog.Errorf("putToFiler: CreateEntry returned error: %v", err)
}
return err
})
if createErr != nil {
glog.Errorf("putToFiler: failed to create entry for %s: %v", filePath, createErr)
return "", filerErrorToS3Error(createErr.Error()), SSEResponseMetadata{}
} }
defer resp.Body.Close()
glog.V(3).Infof("putToFiler: CreateEntry SUCCESS for %s", filePath)
etag = fmt.Sprintf("%x", hash.Sum(nil))
glog.V(2).Infof("putToFiler: Metadata saved SUCCESS - path=%s, etag(hex)=%s, size=%d, partNumber=%d",
filePath, etag, entry.Attributes.FileSize, partNumber)
resp_body, ra_err := io.ReadAll(resp.Body)
if ra_err != nil {
glog.Errorf("upload to filer response read %d: %v", resp.StatusCode, ra_err)
return etag, s3err.ErrInternalError, ""
}
var ret weed_server.FilerPostResult
unmarshal_err := json.Unmarshal(resp_body, &ret)
if unmarshal_err != nil {
glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body))
return "", s3err.ErrInternalError, ""
}
if ret.Error != "" {
glog.Errorf("upload to filer error: %v", ret.Error)
return "", filerErrorToS3Error(ret.Error), ""
BucketTrafficReceived(chunkResult.TotalSize, r)
// Build SSE response metadata with encryption details
responseMetadata := SSEResponseMetadata{
SSEType: sseResult.SSEType,
} }
BucketTrafficReceived(ret.Size, r)
// For SSE-KMS, include key ID and bucket-key-enabled flag from stored metadata
if sseKMSKey != nil {
responseMetadata.KMSKeyID = sseKMSKey.KeyID
responseMetadata.BucketKeyEnabled = sseKMSKey.BucketKeyEnabled
glog.V(4).Infof("putToFiler: returning SSE-KMS metadata - keyID=%s, bucketKeyEnabled=%v",
sseKMSKey.KeyID, sseKMSKey.BucketKeyEnabled)
}
// Return the SSE type determined by the unified handler
return etag, s3err.ErrNone, sseResult.SSEType
return etag, s3err.ErrNone, responseMetadata
} }
func setEtag(w http.ResponseWriter, etag string) { func setEtag(w http.ResponseWriter, etag string) {
@ -384,6 +578,43 @@ func setEtag(w http.ResponseWriter, etag string) {
} }
} }
// setSSEResponseHeaders sets appropriate SSE response headers based on encryption type
func (s3a *S3ApiServer) setSSEResponseHeaders(w http.ResponseWriter, r *http.Request, sseMetadata SSEResponseMetadata) {
switch sseMetadata.SSEType {
case s3_constants.SSETypeS3:
// SSE-S3: Return the encryption algorithm
w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256)
case s3_constants.SSETypeC:
// SSE-C: Echo back the customer-provided algorithm and key MD5
if algo := r.Header.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm); algo != "" {
w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, algo)
}
if keyMD5 := r.Header.Get(s3_constants.AmzServerSideEncryptionCustomerKeyMD5); keyMD5 != "" {
w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, keyMD5)
}
case s3_constants.SSETypeKMS:
// SSE-KMS: Return the KMS key ID and algorithm
w.Header().Set(s3_constants.AmzServerSideEncryption, "aws:kms")
// Use metadata from stored encryption config (for bucket-default encryption)
// or fall back to request headers (for explicit encryption)
if sseMetadata.KMSKeyID != "" {
w.Header().Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, sseMetadata.KMSKeyID)
} else if keyID := r.Header.Get(s3_constants.AmzServerSideEncryptionAwsKmsKeyId); keyID != "" {
w.Header().Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, keyID)
}
// Set bucket-key-enabled header if it was enabled
if sseMetadata.BucketKeyEnabled {
w.Header().Set(s3_constants.AmzServerSideEncryptionBucketKeyEnabled, "true")
} else if bucketKeyEnabled := r.Header.Get(s3_constants.AmzServerSideEncryptionBucketKeyEnabled); bucketKeyEnabled == "true" {
w.Header().Set(s3_constants.AmzServerSideEncryptionBucketKeyEnabled, "true")
}
}
}
func filerErrorToS3Error(errString string) s3err.ErrorCode { func filerErrorToS3Error(errString string) s3err.ErrorCode {
switch { switch {
case errString == constants.ErrMsgBadDigest: case errString == constants.ErrMsgBadDigest:
@ -446,18 +677,18 @@ func (s3a *S3ApiServer) setObjectOwnerFromRequest(r *http.Request, entry *filer_
// //
// For suspended versioning, objects are stored as regular files (version ID "null") in the bucket directory, // For suspended versioning, objects are stored as regular files (version ID "null") in the bucket directory,
// while existing versions from when versioning was enabled remain preserved in the .versions subdirectory. // while existing versions from when versioning was enabled remain preserved in the .versions subdirectory.
func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode) {
func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode, sseMetadata SSEResponseMetadata) {
// Normalize object path to ensure consistency with toFilerUrl behavior // Normalize object path to ensure consistency with toFilerUrl behavior
normalizedObject := removeDuplicateSlashes(object) normalizedObject := removeDuplicateSlashes(object)
// Enable detailed logging for testobjbar // Enable detailed logging for testobjbar
isTestObj := (normalizedObject == "testobjbar") isTestObj := (normalizedObject == "testobjbar")
glog.V(0).Infof("putSuspendedVersioningObject: START bucket=%s, object=%s, normalized=%s, isTestObj=%v",
glog.V(3).Infof("putSuspendedVersioningObject: START bucket=%s, object=%s, normalized=%s, isTestObj=%v",
bucket, object, normalizedObject, isTestObj) bucket, object, normalizedObject, isTestObj)
if isTestObj { if isTestObj {
glog.V(0).Infof("=== TESTOBJBAR: putSuspendedVersioningObject START ===")
glog.V(3).Infof("=== TESTOBJBAR: putSuspendedVersioningObject START ===")
} }
bucketDir := s3a.option.BucketsPath + "/" + bucket bucketDir := s3a.option.BucketsPath + "/" + bucket
@ -470,20 +701,20 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
entries, _, err := s3a.list(versionsDir, "", "", false, 1000) entries, _, err := s3a.list(versionsDir, "", "", false, 1000)
if err == nil { if err == nil {
// .versions directory exists // .versions directory exists
glog.V(0).Infof("putSuspendedVersioningObject: found %d entries in .versions for %s/%s", len(entries), bucket, object)
glog.V(3).Infof("putSuspendedVersioningObject: found %d entries in .versions for %s/%s", len(entries), bucket, object)
for _, entry := range entries { for _, entry := range entries {
if entry.Extended != nil { if entry.Extended != nil {
if versionIdBytes, ok := entry.Extended[s3_constants.ExtVersionIdKey]; ok { if versionIdBytes, ok := entry.Extended[s3_constants.ExtVersionIdKey]; ok {
versionId := string(versionIdBytes) versionId := string(versionIdBytes)
glog.V(0).Infof("putSuspendedVersioningObject: found version '%s' in .versions", versionId)
glog.V(3).Infof("putSuspendedVersioningObject: found version '%s' in .versions", versionId)
if versionId == "null" { if versionId == "null" {
// Only delete null version - preserve real versioned entries // Only delete null version - preserve real versioned entries
glog.V(0).Infof("putSuspendedVersioningObject: deleting null version from .versions")
glog.V(3).Infof("putSuspendedVersioningObject: deleting null version from .versions")
err := s3a.rm(versionsDir, entry.Name, true, false) err := s3a.rm(versionsDir, entry.Name, true, false)
if err != nil { if err != nil {
glog.Warningf("putSuspendedVersioningObject: failed to delete null version: %v", err) glog.Warningf("putSuspendedVersioningObject: failed to delete null version: %v", err)
} else { } else {
glog.V(0).Infof("putSuspendedVersioningObject: successfully deleted null version")
glog.V(3).Infof("putSuspendedVersioningObject: successfully deleted null version")
} }
break break
} }
@ -491,7 +722,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
} }
} }
} else { } else {
glog.V(0).Infof("putSuspendedVersioningObject: no .versions directory for %s/%s", bucket, object)
glog.V(3).Infof("putSuspendedVersioningObject: no .versions directory for %s/%s", bucket, object)
} }
uploadUrl := s3a.toFilerUrl(bucket, normalizedObject) uploadUrl := s3a.toFilerUrl(bucket, normalizedObject)
@ -509,7 +740,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
// Set version ID to "null" for suspended versioning // Set version ID to "null" for suspended versioning
r.Header.Set(s3_constants.ExtVersionIdKey, "null") r.Header.Set(s3_constants.ExtVersionIdKey, "null")
if isTestObj { if isTestObj {
glog.V(0).Infof("=== TESTOBJBAR: set version header before putToFiler, r.Header[%s]=%s ===",
glog.V(3).Infof("=== TESTOBJBAR: set version header before putToFiler, r.Header[%s]=%s ===",
s3_constants.ExtVersionIdKey, r.Header.Get(s3_constants.ExtVersionIdKey)) s3_constants.ExtVersionIdKey, r.Header.Get(s3_constants.ExtVersionIdKey))
} }
@ -528,7 +759,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
parsedTime, err := time.Parse(time.RFC3339, explicitRetainUntilDate) parsedTime, err := time.Parse(time.RFC3339, explicitRetainUntilDate)
if err != nil { if err != nil {
glog.Errorf("putSuspendedVersioningObject: failed to parse retention until date: %v", err) glog.Errorf("putSuspendedVersioningObject: failed to parse retention until date: %v", err)
return "", s3err.ErrInvalidRequest
return "", s3err.ErrInvalidRequest, SSEResponseMetadata{}
} }
r.Header.Set(s3_constants.ExtRetentionUntilDateKey, strconv.FormatInt(parsedTime.Unix(), 10)) r.Header.Set(s3_constants.ExtRetentionUntilDateKey, strconv.FormatInt(parsedTime.Unix(), 10))
glog.V(2).Infof("putSuspendedVersioningObject: setting retention until date header (timestamp: %d)", parsedTime.Unix()) glog.V(2).Infof("putSuspendedVersioningObject: setting retention until date header (timestamp: %d)", parsedTime.Unix())
@ -540,7 +771,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
glog.V(2).Infof("putSuspendedVersioningObject: setting legal hold header: %s", legalHold) glog.V(2).Infof("putSuspendedVersioningObject: setting legal hold header: %s", legalHold)
} else { } else {
glog.Errorf("putSuspendedVersioningObject: invalid legal hold value: %s", legalHold) glog.Errorf("putSuspendedVersioningObject: invalid legal hold value: %s", legalHold)
return "", s3err.ErrInvalidRequest
return "", s3err.ErrInvalidRequest, SSEResponseMetadata{}
} }
} }
@ -563,15 +794,15 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
// Upload the file using putToFiler - this will create the file with version metadata // Upload the file using putToFiler - this will create the file with version metadata
if isTestObj { if isTestObj {
glog.V(0).Infof("=== TESTOBJBAR: calling putToFiler ===")
glog.V(3).Infof("=== TESTOBJBAR: calling putToFiler ===")
} }
etag, errCode, _ = s3a.putToFiler(r, uploadUrl, body, "", bucket, 1)
etag, errCode, sseMetadata = s3a.putToFiler(r, uploadUrl, body, bucket, 1)
if errCode != s3err.ErrNone { if errCode != s3err.ErrNone {
glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode) glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode)
return "", errCode
return "", errCode, SSEResponseMetadata{}
} }
if isTestObj { if isTestObj {
glog.V(0).Infof("=== TESTOBJBAR: putToFiler completed, etag=%s ===", etag)
glog.V(3).Infof("=== TESTOBJBAR: putToFiler completed, etag=%s ===", etag)
} }
// Verify the metadata was set correctly during file creation // Verify the metadata was set correctly during file creation
@ -581,19 +812,19 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
for attempt := 1; attempt <= maxRetries; attempt++ { for attempt := 1; attempt <= maxRetries; attempt++ {
verifyEntry, verifyErr := s3a.getEntry(bucketDir, normalizedObject) verifyEntry, verifyErr := s3a.getEntry(bucketDir, normalizedObject)
if verifyErr == nil { if verifyErr == nil {
glog.V(0).Infof("=== TESTOBJBAR: verify attempt %d, entry.Extended=%v ===", attempt, verifyEntry.Extended)
glog.V(3).Infof("=== TESTOBJBAR: verify attempt %d, entry.Extended=%v ===", attempt, verifyEntry.Extended)
if verifyEntry.Extended != nil { if verifyEntry.Extended != nil {
if versionIdBytes, ok := verifyEntry.Extended[s3_constants.ExtVersionIdKey]; ok { if versionIdBytes, ok := verifyEntry.Extended[s3_constants.ExtVersionIdKey]; ok {
glog.V(0).Infof("=== TESTOBJBAR: verification SUCCESSFUL, version=%s ===", string(versionIdBytes))
glog.V(3).Infof("=== TESTOBJBAR: verification SUCCESSFUL, version=%s ===", string(versionIdBytes))
} else { } else {
glog.V(0).Infof("=== TESTOBJBAR: verification FAILED, ExtVersionIdKey not found ===")
glog.V(3).Infof("=== TESTOBJBAR: verification FAILED, ExtVersionIdKey not found ===")
} }
} else { } else {
glog.V(0).Infof("=== TESTOBJBAR: verification FAILED, Extended is nil ===")
glog.V(3).Infof("=== TESTOBJBAR: verification FAILED, Extended is nil ===")
} }
break break
} else { } else {
glog.V(0).Infof("=== TESTOBJBAR: getEntry failed on attempt %d: %v ===", attempt, verifyErr)
glog.V(3).Infof("=== TESTOBJBAR: getEntry failed on attempt %d: %v ===", attempt, verifyErr)
} }
if attempt < maxRetries { if attempt < maxRetries {
time.Sleep(time.Millisecond * 10) time.Sleep(time.Millisecond * 10)
@ -610,9 +841,9 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
glog.V(2).Infof("putSuspendedVersioningObject: successfully created null version for %s/%s", bucket, object) glog.V(2).Infof("putSuspendedVersioningObject: successfully created null version for %s/%s", bucket, object)
if isTestObj { if isTestObj {
glog.V(0).Infof("=== TESTOBJBAR: putSuspendedVersioningObject COMPLETED ===")
glog.V(3).Infof("=== TESTOBJBAR: putSuspendedVersioningObject COMPLETED ===")
} }
return etag, s3err.ErrNone
return etag, s3err.ErrNone, sseMetadata
} }
// updateIsLatestFlagsForSuspendedVersioning sets IsLatest=false on all existing versions/delete markers // updateIsLatestFlagsForSuspendedVersioning sets IsLatest=false on all existing versions/delete markers
@ -684,7 +915,7 @@ func (s3a *S3ApiServer) updateIsLatestFlagsForSuspendedVersioning(bucket, object
return nil return nil
} }
func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode) {
func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode, sseMetadata SSEResponseMetadata) {
// Generate version ID // Generate version ID
versionId = generateVersionId() versionId = generateVersionId()
@ -709,7 +940,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
}) })
if err != nil { if err != nil {
glog.Errorf("putVersionedObject: failed to create .versions directory: %v", err) glog.Errorf("putVersionedObject: failed to create .versions directory: %v", err)
return "", "", s3err.ErrInternalError
return "", "", s3err.ErrInternalError, SSEResponseMetadata{}
} }
hash := md5.New() hash := md5.New()
@ -720,10 +951,10 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionUploadUrl) glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionUploadUrl)
etag, errCode, _ = s3a.putToFiler(r, versionUploadUrl, body, "", bucket, 1)
etag, errCode, sseMetadata = s3a.putToFiler(r, versionUploadUrl, body, bucket, 1)
if errCode != s3err.ErrNone { if errCode != s3err.ErrNone {
glog.Errorf("putVersionedObject: failed to upload version: %v", errCode) glog.Errorf("putVersionedObject: failed to upload version: %v", errCode)
return "", "", errCode
return "", "", errCode, SSEResponseMetadata{}
} }
// Get the uploaded entry to add versioning metadata // Get the uploaded entry to add versioning metadata
@ -745,7 +976,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
if err != nil { if err != nil {
glog.Errorf("putVersionedObject: failed to get version entry after %d attempts: %v", maxRetries, err) glog.Errorf("putVersionedObject: failed to get version entry after %d attempts: %v", maxRetries, err)
return "", "", s3err.ErrInternalError
return "", "", s3err.ErrInternalError, SSEResponseMetadata{}
} }
// Add versioning metadata to this version // Add versioning metadata to this version
@ -766,7 +997,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
// Extract and store object lock metadata from request headers // Extract and store object lock metadata from request headers
if err := s3a.extractObjectLockMetadataFromRequest(r, versionEntry); err != nil { if err := s3a.extractObjectLockMetadataFromRequest(r, versionEntry); err != nil {
glog.Errorf("putVersionedObject: failed to extract object lock metadata: %v", err) glog.Errorf("putVersionedObject: failed to extract object lock metadata: %v", err)
return "", "", s3err.ErrInvalidRequest
return "", "", s3err.ErrInvalidRequest, SSEResponseMetadata{}
} }
// Update the version entry with metadata // Update the version entry with metadata
@ -777,17 +1008,17 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
}) })
if err != nil { if err != nil {
glog.Errorf("putVersionedObject: failed to update version metadata: %v", err) glog.Errorf("putVersionedObject: failed to update version metadata: %v", err)
return "", "", s3err.ErrInternalError
return "", "", s3err.ErrInternalError, SSEResponseMetadata{}
} }
// Update the .versions directory metadata to indicate this is the latest version // Update the .versions directory metadata to indicate this is the latest version
err = s3a.updateLatestVersionInDirectory(bucket, normalizedObject, versionId, versionFileName) err = s3a.updateLatestVersionInDirectory(bucket, normalizedObject, versionId, versionFileName)
if err != nil { if err != nil {
glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err) glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err)
return "", "", s3err.ErrInternalError
return "", "", s3err.ErrInternalError, SSEResponseMetadata{}
} }
glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s (normalized: %s)", versionId, bucket, object, normalizedObject) glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s (normalized: %s)", versionId, bucket, object, normalizedObject)
return versionId, etag, s3err.ErrNone
return versionId, etag, s3err.ErrNone, sseMetadata
} }
// updateLatestVersionInDirectory updates the .versions directory metadata to indicate the latest version // updateLatestVersionInDirectory updates the .versions directory metadata to indicate the latest version
@ -963,7 +1194,8 @@ func (s3a *S3ApiServer) applySSEKMSDefaultEncryption(bucket string, r *http.Requ
bucketKeyEnabled := encryptionConfig.BucketKeyEnabled bucketKeyEnabled := encryptionConfig.BucketKeyEnabled
// Build encryption context for KMS // Build encryption context for KMS
bucket, object := s3_constants.GetBucketAndObject(r)
// Use bucket parameter passed to function (not from request parsing)
_, object := s3_constants.GetBucketAndObject(r)
encryptionContext := BuildEncryptionContext(bucket, object, bucketKeyEnabled) encryptionContext := BuildEncryptionContext(bucket, object, bucketKeyEnabled)
// Create SSE-KMS encrypted reader // Create SSE-KMS encrypted reader

Loading…
Cancel
Save