|
|
@ -1,7 +1,6 @@ |
|
|
package s3api |
|
|
package s3api |
|
|
|
|
|
|
|
|
import ( |
|
|
import ( |
|
|
"context" |
|
|
|
|
|
"crypto/md5" |
|
|
"crypto/md5" |
|
|
"encoding/base64" |
|
|
"encoding/base64" |
|
|
"encoding/json" |
|
|
"encoding/json" |
|
|
@ -9,21 +8,18 @@ import ( |
|
|
"fmt" |
|
|
"fmt" |
|
|
"io" |
|
|
"io" |
|
|
"net/http" |
|
|
"net/http" |
|
|
"net/url" |
|
|
|
|
|
"path/filepath" |
|
|
|
|
|
"strconv" |
|
|
"strconv" |
|
|
"strings" |
|
|
"strings" |
|
|
"time" |
|
|
"time" |
|
|
|
|
|
|
|
|
"github.com/pquerna/cachecontrol/cacheobject" |
|
|
"github.com/pquerna/cachecontrol/cacheobject" |
|
|
"github.com/seaweedfs/seaweedfs/weed/filer" |
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
"github.com/seaweedfs/seaweedfs/weed/operation" |
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" |
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" |
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" |
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err" |
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err" |
|
|
"github.com/seaweedfs/seaweedfs/weed/security" |
|
|
"github.com/seaweedfs/seaweedfs/weed/security" |
|
|
|
|
|
weed_server "github.com/seaweedfs/seaweedfs/weed/server" |
|
|
stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" |
|
|
stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" |
|
|
"github.com/seaweedfs/seaweedfs/weed/util/constants" |
|
|
"github.com/seaweedfs/seaweedfs/weed/util/constants" |
|
|
) |
|
|
) |
|
|
@ -64,13 +60,6 @@ type BucketDefaultEncryptionResult struct { |
|
|
SSEKMSKey *SSEKMSKey |
|
|
SSEKMSKey *SSEKMSKey |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// SSEResponseMetadata holds encryption metadata needed for HTTP response headers
|
|
|
|
|
|
type SSEResponseMetadata struct { |
|
|
|
|
|
SSEType string |
|
|
|
|
|
KMSKeyID string |
|
|
|
|
|
BucketKeyEnabled bool |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) { |
|
|
func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) { |
|
|
|
|
|
|
|
|
// http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
|
|
|
// http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
|
|
|
@ -146,7 +135,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) |
|
|
versioningEnabled := (versioningState == s3_constants.VersioningEnabled) |
|
|
versioningEnabled := (versioningState == s3_constants.VersioningEnabled) |
|
|
versioningConfigured := (versioningState != "") |
|
|
versioningConfigured := (versioningState != "") |
|
|
|
|
|
|
|
|
glog.V(3).Infof("PutObjectHandler: bucket=%s, object=%s, versioningState='%s', versioningEnabled=%v, versioningConfigured=%v", bucket, object, versioningState, versioningEnabled, versioningConfigured) |
|
|
|
|
|
|
|
|
glog.V(2).Infof("PutObjectHandler: bucket=%s, object=%s, versioningState='%s', versioningEnabled=%v, versioningConfigured=%v", bucket, object, versioningState, versioningEnabled, versioningConfigured) |
|
|
|
|
|
|
|
|
// Validate object lock headers before processing
|
|
|
// Validate object lock headers before processing
|
|
|
if err := s3a.validateObjectLockHeaders(r, versioningEnabled); err != nil { |
|
|
if err := s3a.validateObjectLockHeaders(r, versioningEnabled); err != nil { |
|
|
@ -169,34 +158,29 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) |
|
|
switch versioningState { |
|
|
switch versioningState { |
|
|
case s3_constants.VersioningEnabled: |
|
|
case s3_constants.VersioningEnabled: |
|
|
// Handle enabled versioning - create new versions with real version IDs
|
|
|
// Handle enabled versioning - create new versions with real version IDs
|
|
|
glog.V(3).Infof("PutObjectHandler: ENABLED versioning detected for %s/%s, calling putVersionedObject", bucket, object) |
|
|
|
|
|
versionId, etag, errCode, sseMetadata := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType) |
|
|
|
|
|
|
|
|
glog.V(0).Infof("PutObjectHandler: ENABLED versioning detected for %s/%s, calling putVersionedObject", bucket, object) |
|
|
|
|
|
versionId, etag, errCode := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType) |
|
|
if errCode != s3err.ErrNone { |
|
|
if errCode != s3err.ErrNone { |
|
|
glog.Errorf("PutObjectHandler: putVersionedObject failed with errCode=%v for %s/%s", errCode, bucket, object) |
|
|
glog.Errorf("PutObjectHandler: putVersionedObject failed with errCode=%v for %s/%s", errCode, bucket, object) |
|
|
s3err.WriteErrorResponse(w, r, errCode) |
|
|
s3err.WriteErrorResponse(w, r, errCode) |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
glog.V(3).Infof("PutObjectHandler: putVersionedObject returned versionId=%s, etag=%s for %s/%s", versionId, etag, bucket, object) |
|
|
|
|
|
|
|
|
glog.V(0).Infof("PutObjectHandler: putVersionedObject returned versionId=%s, etag=%s for %s/%s", versionId, etag, bucket, object) |
|
|
|
|
|
|
|
|
// Set version ID in response header
|
|
|
// Set version ID in response header
|
|
|
if versionId != "" { |
|
|
if versionId != "" { |
|
|
w.Header().Set("x-amz-version-id", versionId) |
|
|
w.Header().Set("x-amz-version-id", versionId) |
|
|
glog.V(3).Infof("PutObjectHandler: set x-amz-version-id header to %s for %s/%s", versionId, bucket, object) |
|
|
|
|
|
|
|
|
glog.V(0).Infof("PutObjectHandler: set x-amz-version-id header to %s for %s/%s", versionId, bucket, object) |
|
|
} else { |
|
|
} else { |
|
|
glog.Errorf("PutObjectHandler: CRITICAL - versionId is EMPTY for versioned bucket %s, object %s", bucket, object) |
|
|
glog.Errorf("PutObjectHandler: CRITICAL - versionId is EMPTY for versioned bucket %s, object %s", bucket, object) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Set ETag in response
|
|
|
// Set ETag in response
|
|
|
setEtag(w, etag) |
|
|
setEtag(w, etag) |
|
|
|
|
|
|
|
|
// Set SSE response headers for versioned objects
|
|
|
|
|
|
s3a.setSSEResponseHeaders(w, r, sseMetadata) |
|
|
|
|
|
|
|
|
|
|
|
case s3_constants.VersioningSuspended: |
|
|
case s3_constants.VersioningSuspended: |
|
|
// Handle suspended versioning - overwrite with "null" version ID but preserve existing versions
|
|
|
// Handle suspended versioning - overwrite with "null" version ID but preserve existing versions
|
|
|
glog.V(3).Infof("PutObjectHandler: SUSPENDED versioning detected for %s/%s, calling putSuspendedVersioningObject", bucket, object) |
|
|
|
|
|
etag, errCode, sseMetadata := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType) |
|
|
|
|
|
|
|
|
etag, errCode := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType) |
|
|
if errCode != s3err.ErrNone { |
|
|
if errCode != s3err.ErrNone { |
|
|
s3err.WriteErrorResponse(w, r, errCode) |
|
|
s3err.WriteErrorResponse(w, r, errCode) |
|
|
return |
|
|
return |
|
|
@ -207,9 +191,6 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) |
|
|
|
|
|
|
|
|
// Set ETag in response
|
|
|
// Set ETag in response
|
|
|
setEtag(w, etag) |
|
|
setEtag(w, etag) |
|
|
|
|
|
|
|
|
// Set SSE response headers for suspended versioning
|
|
|
|
|
|
s3a.setSSEResponseHeaders(w, r, sseMetadata) |
|
|
|
|
|
default: |
|
|
default: |
|
|
// Handle regular PUT (never configured versioning)
|
|
|
// Handle regular PUT (never configured versioning)
|
|
|
uploadUrl := s3a.toFilerUrl(bucket, object) |
|
|
uploadUrl := s3a.toFilerUrl(bucket, object) |
|
|
@ -217,7 +198,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) |
|
|
dataReader = mimeDetect(r, dataReader) |
|
|
dataReader = mimeDetect(r, dataReader) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
etag, errCode, sseMetadata := s3a.putToFiler(r, uploadUrl, dataReader, bucket, 1) |
|
|
|
|
|
|
|
|
etag, errCode, sseType := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket, 1) |
|
|
|
|
|
|
|
|
if errCode != s3err.ErrNone { |
|
|
if errCode != s3err.ErrNone { |
|
|
s3err.WriteErrorResponse(w, r, errCode) |
|
|
s3err.WriteErrorResponse(w, r, errCode) |
|
|
@ -228,7 +209,9 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) |
|
|
setEtag(w, etag) |
|
|
setEtag(w, etag) |
|
|
|
|
|
|
|
|
// Set SSE response headers based on encryption type used
|
|
|
// Set SSE response headers based on encryption type used
|
|
|
s3a.setSSEResponseHeaders(w, r, sseMetadata) |
|
|
|
|
|
|
|
|
if sseType == s3_constants.SSETypeS3 { |
|
|
|
|
|
w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256) |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
stats_collect.RecordBucketActiveTime(bucket) |
|
|
stats_collect.RecordBucketActiveTime(bucket) |
|
|
@ -237,18 +220,15 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) |
|
|
writeSuccessResponseEmpty(w, r) |
|
|
writeSuccessResponseEmpty(w, r) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseMetadata SSEResponseMetadata) { |
|
|
|
|
|
// NEW OPTIMIZATION: Write directly to volume servers, bypassing filer proxy
|
|
|
|
|
|
// This eliminates the filer proxy overhead for PUT operations
|
|
|
|
|
|
|
|
|
func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, destination string, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseType string) { |
|
|
|
|
|
// Calculate unique offset for each part to prevent IV reuse in multipart uploads
|
|
|
|
|
|
// This is critical for CTR mode encryption security
|
|
|
|
|
|
partOffset := calculatePartOffset(partNumber) |
|
|
|
|
|
|
|
|
// For SSE, encrypt with offset=0 for all parts
|
|
|
|
|
|
// Each part is encrypted independently, then decrypted using metadata during GET
|
|
|
|
|
|
partOffset := int64(0) |
|
|
|
|
|
|
|
|
|
|
|
// Handle all SSE encryption types in a unified manner
|
|
|
|
|
|
|
|
|
// Handle all SSE encryption types in a unified manner to eliminate repetitive dataReader assignments
|
|
|
sseResult, sseErrorCode := s3a.handleAllSSEEncryption(r, dataReader, partOffset) |
|
|
sseResult, sseErrorCode := s3a.handleAllSSEEncryption(r, dataReader, partOffset) |
|
|
if sseErrorCode != s3err.ErrNone { |
|
|
if sseErrorCode != s3err.ErrNone { |
|
|
return "", sseErrorCode, SSEResponseMetadata{} |
|
|
|
|
|
|
|
|
return "", sseErrorCode, "" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Extract results from unified SSE handling
|
|
|
// Extract results from unified SSE handling
|
|
|
@ -269,7 +249,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader |
|
|
encryptionResult, applyErr := s3a.applyBucketDefaultEncryption(bucket, r, dataReader) |
|
|
encryptionResult, applyErr := s3a.applyBucketDefaultEncryption(bucket, r, dataReader) |
|
|
if applyErr != nil { |
|
|
if applyErr != nil { |
|
|
glog.Errorf("Failed to apply bucket default encryption: %v", applyErr) |
|
|
glog.Errorf("Failed to apply bucket default encryption: %v", applyErr) |
|
|
return "", s3err.ErrInternalError, SSEResponseMetadata{} |
|
|
|
|
|
|
|
|
return "", s3err.ErrInternalError, "" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Update variables based on the result
|
|
|
// Update variables based on the result
|
|
|
@ -283,289 +263,115 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader |
|
|
sseS3Metadata, metaErr = SerializeSSES3Metadata(sseS3Key) |
|
|
sseS3Metadata, metaErr = SerializeSSES3Metadata(sseS3Key) |
|
|
if metaErr != nil { |
|
|
if metaErr != nil { |
|
|
glog.Errorf("Failed to serialize SSE-S3 metadata for bucket default encryption: %v", metaErr) |
|
|
glog.Errorf("Failed to serialize SSE-S3 metadata for bucket default encryption: %v", metaErr) |
|
|
return "", s3err.ErrInternalError, SSEResponseMetadata{} |
|
|
|
|
|
|
|
|
return "", s3err.ErrInternalError, "" |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} else { |
|
|
} else { |
|
|
glog.V(4).Infof("putToFiler: explicit encryption already applied, skipping bucket default encryption") |
|
|
glog.V(4).Infof("putToFiler: explicit encryption already applied, skipping bucket default encryption") |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Parse the upload URL to extract the file path
|
|
|
|
|
|
// uploadUrl format: http://filer:8888/path/to/bucket/object (or https://, IPv6, etc.)
|
|
|
|
|
|
// Use proper URL parsing instead of string manipulation for robustness
|
|
|
|
|
|
parsedUrl, parseErr := url.Parse(uploadUrl) |
|
|
|
|
|
if parseErr != nil { |
|
|
|
|
|
glog.Errorf("putToFiler: failed to parse uploadUrl %q: %v", uploadUrl, parseErr) |
|
|
|
|
|
return "", s3err.ErrInternalError, SSEResponseMetadata{} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Use parsedUrl.Path directly - it's already decoded by url.Parse()
|
|
|
|
|
|
// Per Go documentation: "Path is stored in decoded form: /%47%6f%2f becomes /Go/"
|
|
|
|
|
|
// Calling PathUnescape again would double-decode and fail on keys like "b%ar"
|
|
|
|
|
|
filePath := parsedUrl.Path |
|
|
|
|
|
|
|
|
hash := md5.New() |
|
|
|
|
|
var body = io.TeeReader(dataReader, hash) |
|
|
|
|
|
|
|
|
// Step 1 & 2: Use auto-chunking to handle large files without OOM
|
|
|
|
|
|
// This splits large uploads into 8MB chunks, preventing memory issues on both S3 API and volume servers
|
|
|
|
|
|
const chunkSize = 8 * 1024 * 1024 // 8MB chunks (S3 standard)
|
|
|
|
|
|
const smallFileLimit = 256 * 1024 // 256KB - store inline in filer
|
|
|
|
|
|
|
|
|
proxyReq, err := http.NewRequest(http.MethodPut, uploadUrl, body) |
|
|
|
|
|
|
|
|
collection := "" |
|
|
|
|
|
if s3a.option.FilerGroup != "" { |
|
|
|
|
|
collection = s3a.getCollectionName(bucket) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Create assign function for chunked upload
|
|
|
|
|
|
assignFunc := func(ctx context.Context, count int) (*operation.VolumeAssignRequest, *operation.AssignResult, error) { |
|
|
|
|
|
var assignResult *filer_pb.AssignVolumeResponse |
|
|
|
|
|
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { |
|
|
|
|
|
resp, err := client.AssignVolume(ctx, &filer_pb.AssignVolumeRequest{ |
|
|
|
|
|
Count: int32(count), |
|
|
|
|
|
Replication: "", |
|
|
|
|
|
Collection: collection, |
|
|
|
|
|
DiskType: "", |
|
|
|
|
|
DataCenter: s3a.option.DataCenter, |
|
|
|
|
|
Path: filePath, |
|
|
|
|
|
}) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("assign volume: %w", err) |
|
|
|
|
|
} |
|
|
|
|
|
if resp.Error != "" { |
|
|
|
|
|
return fmt.Errorf("assign volume: %v", resp.Error) |
|
|
|
|
|
} |
|
|
|
|
|
assignResult = resp |
|
|
|
|
|
return nil |
|
|
|
|
|
}) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return nil, nil, err |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Convert filer_pb.AssignVolumeResponse to operation.AssignResult
|
|
|
|
|
|
return nil, &operation.AssignResult{ |
|
|
|
|
|
Fid: assignResult.FileId, |
|
|
|
|
|
Url: assignResult.Location.Url, |
|
|
|
|
|
PublicUrl: assignResult.Location.PublicUrl, |
|
|
|
|
|
Count: uint64(count), |
|
|
|
|
|
Auth: security.EncodedJwt(assignResult.Auth), |
|
|
|
|
|
}, nil |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Upload with auto-chunking
|
|
|
|
|
|
// Use context.Background() to ensure chunk uploads complete even if HTTP request is cancelled
|
|
|
|
|
|
// This prevents partial uploads and data corruption
|
|
|
|
|
|
chunkResult, err := operation.UploadReaderInChunks(context.Background(), dataReader, &operation.ChunkedUploadOption{ |
|
|
|
|
|
ChunkSize: chunkSize, |
|
|
|
|
|
SmallFileLimit: smallFileLimit, |
|
|
|
|
|
Collection: collection, |
|
|
|
|
|
DataCenter: s3a.option.DataCenter, |
|
|
|
|
|
SaveSmallInline: false, // S3 API always creates chunks, never stores inline
|
|
|
|
|
|
MimeType: r.Header.Get("Content-Type"), |
|
|
|
|
|
AssignFunc: assignFunc, |
|
|
|
|
|
}) |
|
|
|
|
|
if err != nil { |
|
|
if err != nil { |
|
|
glog.Errorf("putToFiler: chunked upload failed: %v", err) |
|
|
|
|
|
if strings.Contains(err.Error(), s3err.ErrMsgPayloadChecksumMismatch) { |
|
|
|
|
|
return "", s3err.ErrInvalidDigest, SSEResponseMetadata{} |
|
|
|
|
|
} |
|
|
|
|
|
return "", s3err.ErrInternalError, SSEResponseMetadata{} |
|
|
|
|
|
|
|
|
glog.Errorf("NewRequest %s: %v", uploadUrl, err) |
|
|
|
|
|
return "", s3err.ErrInternalError, "" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Step 3: Calculate MD5 hash and add SSE metadata to chunks
|
|
|
|
|
|
md5Sum := chunkResult.Md5Hash.Sum(nil) |
|
|
|
|
|
|
|
|
|
|
|
glog.V(4).Infof("putToFiler: Chunked upload SUCCESS - path=%s, chunks=%d, size=%d", |
|
|
|
|
|
filePath, len(chunkResult.FileChunks), chunkResult.TotalSize) |
|
|
|
|
|
|
|
|
|
|
|
// Log chunk details for debugging (verbose only - high frequency)
|
|
|
|
|
|
if glog.V(4) { |
|
|
|
|
|
for i, chunk := range chunkResult.FileChunks { |
|
|
|
|
|
glog.Infof(" PUT Chunk[%d]: fid=%s, offset=%d, size=%d", i, chunk.GetFileIdString(), chunk.Offset, chunk.Size) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) |
|
|
|
|
|
if destination != "" { |
|
|
|
|
|
proxyReq.Header.Set(s3_constants.SeaweedStorageDestinationHeader, destination) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Add SSE metadata to all chunks if present
|
|
|
|
|
|
if customerKey != nil { |
|
|
|
|
|
// SSE-C: Create per-chunk metadata (matches filer logic)
|
|
|
|
|
|
for _, chunk := range chunkResult.FileChunks { |
|
|
|
|
|
chunk.SseType = filer_pb.SSEType_SSE_C |
|
|
|
|
|
if len(sseIV) > 0 { |
|
|
|
|
|
// PartOffset tracks position within the encrypted stream
|
|
|
|
|
|
// Since ALL uploads (single-part and multipart parts) encrypt starting from offset 0,
|
|
|
|
|
|
// PartOffset = chunk.Offset represents where this chunk is in that encrypted stream
|
|
|
|
|
|
// - Single-part: chunk.Offset is position in the file's encrypted stream
|
|
|
|
|
|
// - Multipart: chunk.Offset is position in this part's encrypted stream
|
|
|
|
|
|
ssecMetadataStruct := struct { |
|
|
|
|
|
Algorithm string `json:"algorithm"` |
|
|
|
|
|
IV string `json:"iv"` |
|
|
|
|
|
KeyMD5 string `json:"keyMD5"` |
|
|
|
|
|
PartOffset int64 `json:"partOffset"` |
|
|
|
|
|
}{ |
|
|
|
|
|
Algorithm: "AES256", |
|
|
|
|
|
IV: base64.StdEncoding.EncodeToString(sseIV), |
|
|
|
|
|
KeyMD5: customerKey.KeyMD5, |
|
|
|
|
|
PartOffset: chunk.Offset, // Position within the encrypted stream (always encrypted from 0)
|
|
|
|
|
|
} |
|
|
|
|
|
if ssecMetadata, serErr := json.Marshal(ssecMetadataStruct); serErr == nil { |
|
|
|
|
|
chunk.SseMetadata = ssecMetadata |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} else if sseKMSKey != nil { |
|
|
|
|
|
// SSE-KMS: Store serialized metadata in all chunks
|
|
|
|
|
|
for _, chunk := range chunkResult.FileChunks { |
|
|
|
|
|
chunk.SseType = filer_pb.SSEType_SSE_KMS |
|
|
|
|
|
chunk.SseMetadata = sseKMSMetadata |
|
|
|
|
|
} |
|
|
|
|
|
} else if sseS3Key != nil { |
|
|
|
|
|
// SSE-S3: Store serialized metadata in all chunks
|
|
|
|
|
|
for _, chunk := range chunkResult.FileChunks { |
|
|
|
|
|
chunk.SseType = filer_pb.SSEType_SSE_S3 |
|
|
|
|
|
chunk.SseMetadata = sseS3Metadata |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
if s3a.option.FilerGroup != "" { |
|
|
|
|
|
query := proxyReq.URL.Query() |
|
|
|
|
|
query.Add("collection", s3a.getCollectionName(bucket)) |
|
|
|
|
|
proxyReq.URL.RawQuery = query.Encode() |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Step 4: Create metadata entry
|
|
|
|
|
|
now := time.Now() |
|
|
|
|
|
mimeType := r.Header.Get("Content-Type") |
|
|
|
|
|
if mimeType == "" { |
|
|
|
|
|
mimeType = "application/octet-stream" |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Create entry
|
|
|
|
|
|
entry := &filer_pb.Entry{ |
|
|
|
|
|
Name: filepath.Base(filePath), |
|
|
|
|
|
IsDirectory: false, |
|
|
|
|
|
Attributes: &filer_pb.FuseAttributes{ |
|
|
|
|
|
Crtime: now.Unix(), |
|
|
|
|
|
Mtime: now.Unix(), |
|
|
|
|
|
FileMode: 0660, |
|
|
|
|
|
Uid: 0, |
|
|
|
|
|
Gid: 0, |
|
|
|
|
|
Mime: mimeType, |
|
|
|
|
|
FileSize: uint64(chunkResult.TotalSize), |
|
|
|
|
|
}, |
|
|
|
|
|
Chunks: chunkResult.FileChunks, // All chunks from auto-chunking
|
|
|
|
|
|
Extended: make(map[string][]byte), |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Set Md5 attribute based on context:
|
|
|
|
|
|
// 1. For multipart upload PARTS (stored in .uploads/ directory): ALWAYS set Md5
|
|
|
|
|
|
// - Parts must use simple MD5 ETags, never composite format
|
|
|
|
|
|
// - Even if a part has multiple chunks internally, its ETag is MD5 of entire part
|
|
|
|
|
|
// 2. For regular object uploads: only set Md5 for single-chunk uploads
|
|
|
|
|
|
// - Multi-chunk regular objects use composite "md5-count" format
|
|
|
|
|
|
isMultipartPart := strings.Contains(uploadUrl, "/.uploads/") |
|
|
|
|
|
if isMultipartPart || len(chunkResult.FileChunks) == 1 { |
|
|
|
|
|
entry.Attributes.Md5 = md5Sum |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Calculate ETag using the same logic as GET to ensure consistency
|
|
|
|
|
|
// For single chunk: uses entry.Attributes.Md5
|
|
|
|
|
|
// For multiple chunks: uses filer.ETagChunks() which returns "<hash>-<count>"
|
|
|
|
|
|
etag = filer.ETag(entry) |
|
|
|
|
|
glog.V(4).Infof("putToFiler: Calculated ETag=%s for %d chunks", etag, len(chunkResult.FileChunks)) |
|
|
|
|
|
|
|
|
|
|
|
// Set object owner
|
|
|
|
|
|
amzAccountId := r.Header.Get(s3_constants.AmzAccountId) |
|
|
|
|
|
if amzAccountId != "" { |
|
|
|
|
|
entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId) |
|
|
|
|
|
glog.V(2).Infof("putToFiler: setting owner %s for object %s", amzAccountId, filePath) |
|
|
|
|
|
|
|
|
for header, values := range r.Header { |
|
|
|
|
|
for _, value := range values { |
|
|
|
|
|
proxyReq.Header.Add(header, value) |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Set version ID if present
|
|
|
|
|
|
if versionIdHeader := r.Header.Get(s3_constants.ExtVersionIdKey); versionIdHeader != "" { |
|
|
|
|
|
entry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionIdHeader) |
|
|
|
|
|
glog.V(3).Infof("putToFiler: setting version ID %s for object %s", versionIdHeader, filePath) |
|
|
|
|
|
|
|
|
// Log version ID header for debugging
|
|
|
|
|
|
if versionIdHeader := proxyReq.Header.Get(s3_constants.ExtVersionIdKey); versionIdHeader != "" { |
|
|
|
|
|
glog.V(0).Infof("putToFiler: version ID header set: %s=%s for %s", s3_constants.ExtVersionIdKey, versionIdHeader, uploadUrl) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Set TTL-based S3 expiry
|
|
|
|
|
|
entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true") |
|
|
|
|
|
|
|
|
|
|
|
// Copy user metadata and standard headers
|
|
|
|
|
|
for k, v := range r.Header { |
|
|
|
|
|
if len(v) > 0 && len(v[0]) > 0 { |
|
|
|
|
|
if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) { |
|
|
|
|
|
// Go's HTTP server canonicalizes headers (e.g., x-amz-meta-foo → X-Amz-Meta-Foo)
|
|
|
|
|
|
// We store them as they come in (after canonicalization) to preserve the user's intent
|
|
|
|
|
|
entry.Extended[k] = []byte(v[0]) |
|
|
|
|
|
} else if k == "Cache-Control" || k == "Expires" || k == "Content-Disposition" { |
|
|
|
|
|
entry.Extended[k] = []byte(v[0]) |
|
|
|
|
|
} |
|
|
|
|
|
if k == "Response-Content-Disposition" { |
|
|
|
|
|
entry.Extended["Content-Disposition"] = []byte(v[0]) |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// Set object owner header for filer to extract
|
|
|
|
|
|
amzAccountId := r.Header.Get(s3_constants.AmzAccountId) |
|
|
|
|
|
if amzAccountId != "" { |
|
|
|
|
|
proxyReq.Header.Set(s3_constants.ExtAmzOwnerKey, amzAccountId) |
|
|
|
|
|
glog.V(2).Infof("putToFiler: setting owner header %s for object %s", amzAccountId, uploadUrl) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Set SSE-C metadata
|
|
|
|
|
|
|
|
|
// Set SSE-C metadata headers for the filer if encryption was applied
|
|
|
if customerKey != nil && len(sseIV) > 0 { |
|
|
if customerKey != nil && len(sseIV) > 0 { |
|
|
// Store IV as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes)
|
|
|
|
|
|
entry.Extended[s3_constants.SeaweedFSSSEIV] = sseIV |
|
|
|
|
|
entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte("AES256") |
|
|
|
|
|
entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(customerKey.KeyMD5) |
|
|
|
|
|
glog.V(3).Infof("putToFiler: storing SSE-C metadata - IV len=%d", len(sseIV)) |
|
|
|
|
|
|
|
|
proxyReq.Header.Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, "AES256") |
|
|
|
|
|
proxyReq.Header.Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, customerKey.KeyMD5) |
|
|
|
|
|
// Store IV in a custom header that the filer can use to store in entry metadata
|
|
|
|
|
|
proxyReq.Header.Set(s3_constants.SeaweedFSSSEIVHeader, base64.StdEncoding.EncodeToString(sseIV)) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Set SSE-KMS metadata
|
|
|
|
|
|
|
|
|
// Set SSE-KMS metadata headers for the filer if KMS encryption was applied
|
|
|
if sseKMSKey != nil { |
|
|
if sseKMSKey != nil { |
|
|
// Store metadata as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes)
|
|
|
|
|
|
entry.Extended[s3_constants.SeaweedFSSSEKMSKey] = sseKMSMetadata |
|
|
|
|
|
// Set standard SSE headers for detection
|
|
|
|
|
|
entry.Extended[s3_constants.AmzServerSideEncryption] = []byte("aws:kms") |
|
|
|
|
|
entry.Extended[s3_constants.AmzServerSideEncryptionAwsKmsKeyId] = []byte(sseKMSKey.KeyID) |
|
|
|
|
|
glog.V(3).Infof("putToFiler: storing SSE-KMS metadata - keyID=%s, raw len=%d", sseKMSKey.KeyID, len(sseKMSMetadata)) |
|
|
|
|
|
|
|
|
// Use already-serialized SSE-KMS metadata from helper function
|
|
|
|
|
|
// Store serialized KMS metadata in a custom header that the filer can use
|
|
|
|
|
|
proxyReq.Header.Set(s3_constants.SeaweedFSSSEKMSKeyHeader, base64.StdEncoding.EncodeToString(sseKMSMetadata)) |
|
|
|
|
|
|
|
|
|
|
|
glog.V(3).Infof("putToFiler: storing SSE-KMS metadata for object %s with keyID %s", uploadUrl, sseKMSKey.KeyID) |
|
|
|
|
|
} else { |
|
|
|
|
|
glog.V(4).Infof("putToFiler: no SSE-KMS encryption detected") |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Set SSE-S3 metadata
|
|
|
|
|
|
|
|
|
// Set SSE-S3 metadata headers for the filer if S3 encryption was applied
|
|
|
if sseS3Key != nil && len(sseS3Metadata) > 0 { |
|
|
if sseS3Key != nil && len(sseS3Metadata) > 0 { |
|
|
// Store metadata as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes)
|
|
|
|
|
|
entry.Extended[s3_constants.SeaweedFSSSES3Key] = sseS3Metadata |
|
|
|
|
|
// Set standard SSE header for detection
|
|
|
|
|
|
entry.Extended[s3_constants.AmzServerSideEncryption] = []byte("AES256") |
|
|
|
|
|
glog.V(3).Infof("putToFiler: storing SSE-S3 metadata - keyID=%s, raw len=%d", sseS3Key.KeyID, len(sseS3Metadata)) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Step 4: Save metadata to filer via gRPC
|
|
|
|
|
|
// Use context.Background() to ensure metadata save completes even if HTTP request is cancelled
|
|
|
|
|
|
// This matches the chunk upload behavior and prevents orphaned chunks
|
|
|
|
|
|
glog.V(3).Infof("putToFiler: About to create entry - dir=%s, name=%s, chunks=%d, extended keys=%d", |
|
|
|
|
|
filepath.Dir(filePath), filepath.Base(filePath), len(entry.Chunks), len(entry.Extended)) |
|
|
|
|
|
createErr := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { |
|
|
|
|
|
req := &filer_pb.CreateEntryRequest{ |
|
|
|
|
|
Directory: filepath.Dir(filePath), |
|
|
|
|
|
Entry: entry, |
|
|
|
|
|
|
|
|
// Store serialized S3 metadata in a custom header that the filer can use
|
|
|
|
|
|
proxyReq.Header.Set(s3_constants.SeaweedFSSSES3Key, base64.StdEncoding.EncodeToString(sseS3Metadata)) |
|
|
|
|
|
glog.V(3).Infof("putToFiler: storing SSE-S3 metadata for object %s with keyID %s", uploadUrl, sseS3Key.KeyID) |
|
|
|
|
|
} |
|
|
|
|
|
// Set TTL-based S3 expiry (modification time)
|
|
|
|
|
|
proxyReq.Header.Set(s3_constants.SeaweedFSExpiresS3, "true") |
|
|
|
|
|
// ensure that the Authorization header is overriding any previous
|
|
|
|
|
|
// Authorization header which might be already present in proxyReq
|
|
|
|
|
|
s3a.maybeAddFilerJwtAuthorization(proxyReq, true) |
|
|
|
|
|
resp, postErr := s3a.client.Do(proxyReq) |
|
|
|
|
|
|
|
|
|
|
|
if postErr != nil { |
|
|
|
|
|
glog.Errorf("post to filer: %v", postErr) |
|
|
|
|
|
if strings.Contains(postErr.Error(), s3err.ErrMsgPayloadChecksumMismatch) { |
|
|
|
|
|
return "", s3err.ErrInvalidDigest, "" |
|
|
} |
|
|
} |
|
|
glog.V(3).Infof("putToFiler: Calling CreateEntry for %s", filePath) |
|
|
|
|
|
_, err := client.CreateEntry(context.Background(), req) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
glog.Errorf("putToFiler: CreateEntry returned error: %v", err) |
|
|
|
|
|
} |
|
|
|
|
|
return err |
|
|
|
|
|
}) |
|
|
|
|
|
if createErr != nil { |
|
|
|
|
|
glog.Errorf("putToFiler: failed to create entry for %s: %v", filePath, createErr) |
|
|
|
|
|
return "", filerErrorToS3Error(createErr.Error()), SSEResponseMetadata{} |
|
|
|
|
|
|
|
|
return "", s3err.ErrInternalError, "" |
|
|
} |
|
|
} |
|
|
glog.V(3).Infof("putToFiler: CreateEntry SUCCESS for %s", filePath) |
|
|
|
|
|
|
|
|
|
|
|
glog.V(2).Infof("putToFiler: Metadata saved SUCCESS - path=%s, etag(hex)=%s, size=%d, partNumber=%d", |
|
|
|
|
|
filePath, etag, entry.Attributes.FileSize, partNumber) |
|
|
|
|
|
|
|
|
defer resp.Body.Close() |
|
|
|
|
|
|
|
|
BucketTrafficReceived(chunkResult.TotalSize, r) |
|
|
|
|
|
|
|
|
etag = fmt.Sprintf("%x", hash.Sum(nil)) |
|
|
|
|
|
|
|
|
// Build SSE response metadata with encryption details
|
|
|
|
|
|
responseMetadata := SSEResponseMetadata{ |
|
|
|
|
|
SSEType: sseResult.SSEType, |
|
|
|
|
|
|
|
|
resp_body, ra_err := io.ReadAll(resp.Body) |
|
|
|
|
|
if ra_err != nil { |
|
|
|
|
|
glog.Errorf("upload to filer response read %d: %v", resp.StatusCode, ra_err) |
|
|
|
|
|
return etag, s3err.ErrInternalError, "" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// For SSE-KMS, include key ID and bucket-key-enabled flag from stored metadata
|
|
|
|
|
|
if sseKMSKey != nil { |
|
|
|
|
|
responseMetadata.KMSKeyID = sseKMSKey.KeyID |
|
|
|
|
|
responseMetadata.BucketKeyEnabled = sseKMSKey.BucketKeyEnabled |
|
|
|
|
|
glog.V(4).Infof("putToFiler: returning SSE-KMS metadata - keyID=%s, bucketKeyEnabled=%v", |
|
|
|
|
|
sseKMSKey.KeyID, sseKMSKey.BucketKeyEnabled) |
|
|
|
|
|
|
|
|
var ret weed_server.FilerPostResult |
|
|
|
|
|
unmarshal_err := json.Unmarshal(resp_body, &ret) |
|
|
|
|
|
if unmarshal_err != nil { |
|
|
|
|
|
glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body)) |
|
|
|
|
|
return "", s3err.ErrInternalError, "" |
|
|
} |
|
|
} |
|
|
|
|
|
if ret.Error != "" { |
|
|
|
|
|
glog.Errorf("upload to filer error: %v", ret.Error) |
|
|
|
|
|
return "", filerErrorToS3Error(ret.Error), "" |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
BucketTrafficReceived(ret.Size, r) |
|
|
|
|
|
|
|
|
return etag, s3err.ErrNone, responseMetadata |
|
|
|
|
|
|
|
|
// Return the SSE type determined by the unified handler
|
|
|
|
|
|
return etag, s3err.ErrNone, sseResult.SSEType |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func setEtag(w http.ResponseWriter, etag string) { |
|
|
func setEtag(w http.ResponseWriter, etag string) { |
|
|
@ -578,43 +384,6 @@ func setEtag(w http.ResponseWriter, etag string) { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// setSSEResponseHeaders sets appropriate SSE response headers based on encryption type
|
|
|
|
|
|
func (s3a *S3ApiServer) setSSEResponseHeaders(w http.ResponseWriter, r *http.Request, sseMetadata SSEResponseMetadata) { |
|
|
|
|
|
switch sseMetadata.SSEType { |
|
|
|
|
|
case s3_constants.SSETypeS3: |
|
|
|
|
|
// SSE-S3: Return the encryption algorithm
|
|
|
|
|
|
w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256) |
|
|
|
|
|
|
|
|
|
|
|
case s3_constants.SSETypeC: |
|
|
|
|
|
// SSE-C: Echo back the customer-provided algorithm and key MD5
|
|
|
|
|
|
if algo := r.Header.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm); algo != "" { |
|
|
|
|
|
w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, algo) |
|
|
|
|
|
} |
|
|
|
|
|
if keyMD5 := r.Header.Get(s3_constants.AmzServerSideEncryptionCustomerKeyMD5); keyMD5 != "" { |
|
|
|
|
|
w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, keyMD5) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
case s3_constants.SSETypeKMS: |
|
|
|
|
|
// SSE-KMS: Return the KMS key ID and algorithm
|
|
|
|
|
|
w.Header().Set(s3_constants.AmzServerSideEncryption, "aws:kms") |
|
|
|
|
|
|
|
|
|
|
|
// Use metadata from stored encryption config (for bucket-default encryption)
|
|
|
|
|
|
// or fall back to request headers (for explicit encryption)
|
|
|
|
|
|
if sseMetadata.KMSKeyID != "" { |
|
|
|
|
|
w.Header().Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, sseMetadata.KMSKeyID) |
|
|
|
|
|
} else if keyID := r.Header.Get(s3_constants.AmzServerSideEncryptionAwsKmsKeyId); keyID != "" { |
|
|
|
|
|
w.Header().Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, keyID) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Set bucket-key-enabled header if it was enabled
|
|
|
|
|
|
if sseMetadata.BucketKeyEnabled { |
|
|
|
|
|
w.Header().Set(s3_constants.AmzServerSideEncryptionBucketKeyEnabled, "true") |
|
|
|
|
|
} else if bucketKeyEnabled := r.Header.Get(s3_constants.AmzServerSideEncryptionBucketKeyEnabled); bucketKeyEnabled == "true" { |
|
|
|
|
|
w.Header().Set(s3_constants.AmzServerSideEncryptionBucketKeyEnabled, "true") |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func filerErrorToS3Error(errString string) s3err.ErrorCode { |
|
|
func filerErrorToS3Error(errString string) s3err.ErrorCode { |
|
|
switch { |
|
|
switch { |
|
|
case errString == constants.ErrMsgBadDigest: |
|
|
case errString == constants.ErrMsgBadDigest: |
|
|
@ -677,18 +446,18 @@ func (s3a *S3ApiServer) setObjectOwnerFromRequest(r *http.Request, entry *filer_ |
|
|
//
|
|
|
//
|
|
|
// For suspended versioning, objects are stored as regular files (version ID "null") in the bucket directory,
|
|
|
// For suspended versioning, objects are stored as regular files (version ID "null") in the bucket directory,
|
|
|
// while existing versions from when versioning was enabled remain preserved in the .versions subdirectory.
|
|
|
// while existing versions from when versioning was enabled remain preserved in the .versions subdirectory.
|
|
|
func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode, sseMetadata SSEResponseMetadata) { |
|
|
|
|
|
|
|
|
func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode) { |
|
|
// Normalize object path to ensure consistency with toFilerUrl behavior
|
|
|
// Normalize object path to ensure consistency with toFilerUrl behavior
|
|
|
normalizedObject := removeDuplicateSlashes(object) |
|
|
normalizedObject := removeDuplicateSlashes(object) |
|
|
|
|
|
|
|
|
// Enable detailed logging for testobjbar
|
|
|
// Enable detailed logging for testobjbar
|
|
|
isTestObj := (normalizedObject == "testobjbar") |
|
|
isTestObj := (normalizedObject == "testobjbar") |
|
|
|
|
|
|
|
|
glog.V(3).Infof("putSuspendedVersioningObject: START bucket=%s, object=%s, normalized=%s, isTestObj=%v", |
|
|
|
|
|
|
|
|
glog.V(0).Infof("putSuspendedVersioningObject: START bucket=%s, object=%s, normalized=%s, isTestObj=%v", |
|
|
bucket, object, normalizedObject, isTestObj) |
|
|
bucket, object, normalizedObject, isTestObj) |
|
|
|
|
|
|
|
|
if isTestObj { |
|
|
if isTestObj { |
|
|
glog.V(3).Infof("=== TESTOBJBAR: putSuspendedVersioningObject START ===") |
|
|
|
|
|
|
|
|
glog.V(0).Infof("=== TESTOBJBAR: putSuspendedVersioningObject START ===") |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
bucketDir := s3a.option.BucketsPath + "/" + bucket |
|
|
bucketDir := s3a.option.BucketsPath + "/" + bucket |
|
|
@ -701,20 +470,20 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob |
|
|
entries, _, err := s3a.list(versionsDir, "", "", false, 1000) |
|
|
entries, _, err := s3a.list(versionsDir, "", "", false, 1000) |
|
|
if err == nil { |
|
|
if err == nil { |
|
|
// .versions directory exists
|
|
|
// .versions directory exists
|
|
|
glog.V(3).Infof("putSuspendedVersioningObject: found %d entries in .versions for %s/%s", len(entries), bucket, object) |
|
|
|
|
|
|
|
|
glog.V(0).Infof("putSuspendedVersioningObject: found %d entries in .versions for %s/%s", len(entries), bucket, object) |
|
|
for _, entry := range entries { |
|
|
for _, entry := range entries { |
|
|
if entry.Extended != nil { |
|
|
if entry.Extended != nil { |
|
|
if versionIdBytes, ok := entry.Extended[s3_constants.ExtVersionIdKey]; ok { |
|
|
if versionIdBytes, ok := entry.Extended[s3_constants.ExtVersionIdKey]; ok { |
|
|
versionId := string(versionIdBytes) |
|
|
versionId := string(versionIdBytes) |
|
|
glog.V(3).Infof("putSuspendedVersioningObject: found version '%s' in .versions", versionId) |
|
|
|
|
|
|
|
|
glog.V(0).Infof("putSuspendedVersioningObject: found version '%s' in .versions", versionId) |
|
|
if versionId == "null" { |
|
|
if versionId == "null" { |
|
|
// Only delete null version - preserve real versioned entries
|
|
|
// Only delete null version - preserve real versioned entries
|
|
|
glog.V(3).Infof("putSuspendedVersioningObject: deleting null version from .versions") |
|
|
|
|
|
|
|
|
glog.V(0).Infof("putSuspendedVersioningObject: deleting null version from .versions") |
|
|
err := s3a.rm(versionsDir, entry.Name, true, false) |
|
|
err := s3a.rm(versionsDir, entry.Name, true, false) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
glog.Warningf("putSuspendedVersioningObject: failed to delete null version: %v", err) |
|
|
glog.Warningf("putSuspendedVersioningObject: failed to delete null version: %v", err) |
|
|
} else { |
|
|
} else { |
|
|
glog.V(3).Infof("putSuspendedVersioningObject: successfully deleted null version") |
|
|
|
|
|
|
|
|
glog.V(0).Infof("putSuspendedVersioningObject: successfully deleted null version") |
|
|
} |
|
|
} |
|
|
break |
|
|
break |
|
|
} |
|
|
} |
|
|
@ -722,7 +491,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} else { |
|
|
} else { |
|
|
glog.V(3).Infof("putSuspendedVersioningObject: no .versions directory for %s/%s", bucket, object) |
|
|
|
|
|
|
|
|
glog.V(0).Infof("putSuspendedVersioningObject: no .versions directory for %s/%s", bucket, object) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
uploadUrl := s3a.toFilerUrl(bucket, normalizedObject) |
|
|
uploadUrl := s3a.toFilerUrl(bucket, normalizedObject) |
|
|
@ -740,7 +509,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob |
|
|
// Set version ID to "null" for suspended versioning
|
|
|
// Set version ID to "null" for suspended versioning
|
|
|
r.Header.Set(s3_constants.ExtVersionIdKey, "null") |
|
|
r.Header.Set(s3_constants.ExtVersionIdKey, "null") |
|
|
if isTestObj { |
|
|
if isTestObj { |
|
|
glog.V(3).Infof("=== TESTOBJBAR: set version header before putToFiler, r.Header[%s]=%s ===", |
|
|
|
|
|
|
|
|
glog.V(0).Infof("=== TESTOBJBAR: set version header before putToFiler, r.Header[%s]=%s ===", |
|
|
s3_constants.ExtVersionIdKey, r.Header.Get(s3_constants.ExtVersionIdKey)) |
|
|
s3_constants.ExtVersionIdKey, r.Header.Get(s3_constants.ExtVersionIdKey)) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@ -759,7 +528,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob |
|
|
parsedTime, err := time.Parse(time.RFC3339, explicitRetainUntilDate) |
|
|
parsedTime, err := time.Parse(time.RFC3339, explicitRetainUntilDate) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
glog.Errorf("putSuspendedVersioningObject: failed to parse retention until date: %v", err) |
|
|
glog.Errorf("putSuspendedVersioningObject: failed to parse retention until date: %v", err) |
|
|
return "", s3err.ErrInvalidRequest, SSEResponseMetadata{} |
|
|
|
|
|
|
|
|
return "", s3err.ErrInvalidRequest |
|
|
} |
|
|
} |
|
|
r.Header.Set(s3_constants.ExtRetentionUntilDateKey, strconv.FormatInt(parsedTime.Unix(), 10)) |
|
|
r.Header.Set(s3_constants.ExtRetentionUntilDateKey, strconv.FormatInt(parsedTime.Unix(), 10)) |
|
|
glog.V(2).Infof("putSuspendedVersioningObject: setting retention until date header (timestamp: %d)", parsedTime.Unix()) |
|
|
glog.V(2).Infof("putSuspendedVersioningObject: setting retention until date header (timestamp: %d)", parsedTime.Unix()) |
|
|
@ -771,7 +540,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob |
|
|
glog.V(2).Infof("putSuspendedVersioningObject: setting legal hold header: %s", legalHold) |
|
|
glog.V(2).Infof("putSuspendedVersioningObject: setting legal hold header: %s", legalHold) |
|
|
} else { |
|
|
} else { |
|
|
glog.Errorf("putSuspendedVersioningObject: invalid legal hold value: %s", legalHold) |
|
|
glog.Errorf("putSuspendedVersioningObject: invalid legal hold value: %s", legalHold) |
|
|
return "", s3err.ErrInvalidRequest, SSEResponseMetadata{} |
|
|
|
|
|
|
|
|
return "", s3err.ErrInvalidRequest |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@ -794,15 +563,15 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob |
|
|
|
|
|
|
|
|
// Upload the file using putToFiler - this will create the file with version metadata
|
|
|
// Upload the file using putToFiler - this will create the file with version metadata
|
|
|
if isTestObj { |
|
|
if isTestObj { |
|
|
glog.V(3).Infof("=== TESTOBJBAR: calling putToFiler ===") |
|
|
|
|
|
|
|
|
glog.V(0).Infof("=== TESTOBJBAR: calling putToFiler ===") |
|
|
} |
|
|
} |
|
|
etag, errCode, sseMetadata = s3a.putToFiler(r, uploadUrl, body, bucket, 1) |
|
|
|
|
|
|
|
|
etag, errCode, _ = s3a.putToFiler(r, uploadUrl, body, "", bucket, 1) |
|
|
if errCode != s3err.ErrNone { |
|
|
if errCode != s3err.ErrNone { |
|
|
glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode) |
|
|
glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode) |
|
|
return "", errCode, SSEResponseMetadata{} |
|
|
|
|
|
|
|
|
return "", errCode |
|
|
} |
|
|
} |
|
|
if isTestObj { |
|
|
if isTestObj { |
|
|
glog.V(3).Infof("=== TESTOBJBAR: putToFiler completed, etag=%s ===", etag) |
|
|
|
|
|
|
|
|
glog.V(0).Infof("=== TESTOBJBAR: putToFiler completed, etag=%s ===", etag) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Verify the metadata was set correctly during file creation
|
|
|
// Verify the metadata was set correctly during file creation
|
|
|
@ -812,19 +581,19 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob |
|
|
for attempt := 1; attempt <= maxRetries; attempt++ { |
|
|
for attempt := 1; attempt <= maxRetries; attempt++ { |
|
|
verifyEntry, verifyErr := s3a.getEntry(bucketDir, normalizedObject) |
|
|
verifyEntry, verifyErr := s3a.getEntry(bucketDir, normalizedObject) |
|
|
if verifyErr == nil { |
|
|
if verifyErr == nil { |
|
|
glog.V(3).Infof("=== TESTOBJBAR: verify attempt %d, entry.Extended=%v ===", attempt, verifyEntry.Extended) |
|
|
|
|
|
|
|
|
glog.V(0).Infof("=== TESTOBJBAR: verify attempt %d, entry.Extended=%v ===", attempt, verifyEntry.Extended) |
|
|
if verifyEntry.Extended != nil { |
|
|
if verifyEntry.Extended != nil { |
|
|
if versionIdBytes, ok := verifyEntry.Extended[s3_constants.ExtVersionIdKey]; ok { |
|
|
if versionIdBytes, ok := verifyEntry.Extended[s3_constants.ExtVersionIdKey]; ok { |
|
|
glog.V(3).Infof("=== TESTOBJBAR: verification SUCCESSFUL, version=%s ===", string(versionIdBytes)) |
|
|
|
|
|
|
|
|
glog.V(0).Infof("=== TESTOBJBAR: verification SUCCESSFUL, version=%s ===", string(versionIdBytes)) |
|
|
} else { |
|
|
} else { |
|
|
glog.V(3).Infof("=== TESTOBJBAR: verification FAILED, ExtVersionIdKey not found ===") |
|
|
|
|
|
|
|
|
glog.V(0).Infof("=== TESTOBJBAR: verification FAILED, ExtVersionIdKey not found ===") |
|
|
} |
|
|
} |
|
|
} else { |
|
|
} else { |
|
|
glog.V(3).Infof("=== TESTOBJBAR: verification FAILED, Extended is nil ===") |
|
|
|
|
|
|
|
|
glog.V(0).Infof("=== TESTOBJBAR: verification FAILED, Extended is nil ===") |
|
|
} |
|
|
} |
|
|
break |
|
|
break |
|
|
} else { |
|
|
} else { |
|
|
glog.V(3).Infof("=== TESTOBJBAR: getEntry failed on attempt %d: %v ===", attempt, verifyErr) |
|
|
|
|
|
|
|
|
glog.V(0).Infof("=== TESTOBJBAR: getEntry failed on attempt %d: %v ===", attempt, verifyErr) |
|
|
} |
|
|
} |
|
|
if attempt < maxRetries { |
|
|
if attempt < maxRetries { |
|
|
time.Sleep(time.Millisecond * 10) |
|
|
time.Sleep(time.Millisecond * 10) |
|
|
@ -841,9 +610,9 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob |
|
|
|
|
|
|
|
|
glog.V(2).Infof("putSuspendedVersioningObject: successfully created null version for %s/%s", bucket, object) |
|
|
glog.V(2).Infof("putSuspendedVersioningObject: successfully created null version for %s/%s", bucket, object) |
|
|
if isTestObj { |
|
|
if isTestObj { |
|
|
glog.V(3).Infof("=== TESTOBJBAR: putSuspendedVersioningObject COMPLETED ===") |
|
|
|
|
|
|
|
|
glog.V(0).Infof("=== TESTOBJBAR: putSuspendedVersioningObject COMPLETED ===") |
|
|
} |
|
|
} |
|
|
return etag, s3err.ErrNone, sseMetadata |
|
|
|
|
|
|
|
|
return etag, s3err.ErrNone |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// updateIsLatestFlagsForSuspendedVersioning sets IsLatest=false on all existing versions/delete markers
|
|
|
// updateIsLatestFlagsForSuspendedVersioning sets IsLatest=false on all existing versions/delete markers
|
|
|
@ -915,7 +684,7 @@ func (s3a *S3ApiServer) updateIsLatestFlagsForSuspendedVersioning(bucket, object |
|
|
return nil |
|
|
return nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode, sseMetadata SSEResponseMetadata) { |
|
|
|
|
|
|
|
|
func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode) { |
|
|
// Generate version ID
|
|
|
// Generate version ID
|
|
|
versionId = generateVersionId() |
|
|
versionId = generateVersionId() |
|
|
|
|
|
|
|
|
@ -940,7 +709,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin |
|
|
}) |
|
|
}) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
glog.Errorf("putVersionedObject: failed to create .versions directory: %v", err) |
|
|
glog.Errorf("putVersionedObject: failed to create .versions directory: %v", err) |
|
|
return "", "", s3err.ErrInternalError, SSEResponseMetadata{} |
|
|
|
|
|
|
|
|
return "", "", s3err.ErrInternalError |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
hash := md5.New() |
|
|
hash := md5.New() |
|
|
@ -951,10 +720,10 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin |
|
|
|
|
|
|
|
|
glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionUploadUrl) |
|
|
glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionUploadUrl) |
|
|
|
|
|
|
|
|
etag, errCode, sseMetadata = s3a.putToFiler(r, versionUploadUrl, body, bucket, 1) |
|
|
|
|
|
|
|
|
etag, errCode, _ = s3a.putToFiler(r, versionUploadUrl, body, "", bucket, 1) |
|
|
if errCode != s3err.ErrNone { |
|
|
if errCode != s3err.ErrNone { |
|
|
glog.Errorf("putVersionedObject: failed to upload version: %v", errCode) |
|
|
glog.Errorf("putVersionedObject: failed to upload version: %v", errCode) |
|
|
return "", "", errCode, SSEResponseMetadata{} |
|
|
|
|
|
|
|
|
return "", "", errCode |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Get the uploaded entry to add versioning metadata
|
|
|
// Get the uploaded entry to add versioning metadata
|
|
|
@ -976,7 +745,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin |
|
|
|
|
|
|
|
|
if err != nil { |
|
|
if err != nil { |
|
|
glog.Errorf("putVersionedObject: failed to get version entry after %d attempts: %v", maxRetries, err) |
|
|
glog.Errorf("putVersionedObject: failed to get version entry after %d attempts: %v", maxRetries, err) |
|
|
return "", "", s3err.ErrInternalError, SSEResponseMetadata{} |
|
|
|
|
|
|
|
|
return "", "", s3err.ErrInternalError |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Add versioning metadata to this version
|
|
|
// Add versioning metadata to this version
|
|
|
@ -997,7 +766,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin |
|
|
// Extract and store object lock metadata from request headers
|
|
|
// Extract and store object lock metadata from request headers
|
|
|
if err := s3a.extractObjectLockMetadataFromRequest(r, versionEntry); err != nil { |
|
|
if err := s3a.extractObjectLockMetadataFromRequest(r, versionEntry); err != nil { |
|
|
glog.Errorf("putVersionedObject: failed to extract object lock metadata: %v", err) |
|
|
glog.Errorf("putVersionedObject: failed to extract object lock metadata: %v", err) |
|
|
return "", "", s3err.ErrInvalidRequest, SSEResponseMetadata{} |
|
|
|
|
|
|
|
|
return "", "", s3err.ErrInvalidRequest |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Update the version entry with metadata
|
|
|
// Update the version entry with metadata
|
|
|
@ -1008,17 +777,17 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin |
|
|
}) |
|
|
}) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
glog.Errorf("putVersionedObject: failed to update version metadata: %v", err) |
|
|
glog.Errorf("putVersionedObject: failed to update version metadata: %v", err) |
|
|
return "", "", s3err.ErrInternalError, SSEResponseMetadata{} |
|
|
|
|
|
|
|
|
return "", "", s3err.ErrInternalError |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Update the .versions directory metadata to indicate this is the latest version
|
|
|
// Update the .versions directory metadata to indicate this is the latest version
|
|
|
err = s3a.updateLatestVersionInDirectory(bucket, normalizedObject, versionId, versionFileName) |
|
|
err = s3a.updateLatestVersionInDirectory(bucket, normalizedObject, versionId, versionFileName) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err) |
|
|
glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err) |
|
|
return "", "", s3err.ErrInternalError, SSEResponseMetadata{} |
|
|
|
|
|
|
|
|
return "", "", s3err.ErrInternalError |
|
|
} |
|
|
} |
|
|
glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s (normalized: %s)", versionId, bucket, object, normalizedObject) |
|
|
glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s (normalized: %s)", versionId, bucket, object, normalizedObject) |
|
|
return versionId, etag, s3err.ErrNone, sseMetadata |
|
|
|
|
|
|
|
|
return versionId, etag, s3err.ErrNone |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// updateLatestVersionInDirectory updates the .versions directory metadata to indicate the latest version
|
|
|
// updateLatestVersionInDirectory updates the .versions directory metadata to indicate the latest version
|
|
|
@ -1194,8 +963,7 @@ func (s3a *S3ApiServer) applySSEKMSDefaultEncryption(bucket string, r *http.Requ |
|
|
bucketKeyEnabled := encryptionConfig.BucketKeyEnabled |
|
|
bucketKeyEnabled := encryptionConfig.BucketKeyEnabled |
|
|
|
|
|
|
|
|
// Build encryption context for KMS
|
|
|
// Build encryption context for KMS
|
|
|
// Use bucket parameter passed to function (not from request parsing)
|
|
|
|
|
|
_, object := s3_constants.GetBucketAndObject(r) |
|
|
|
|
|
|
|
|
bucket, object := s3_constants.GetBucketAndObject(r) |
|
|
encryptionContext := BuildEncryptionContext(bucket, object, bucketKeyEnabled) |
|
|
encryptionContext := BuildEncryptionContext(bucket, object, bucketKeyEnabled) |
|
|
|
|
|
|
|
|
// Create SSE-KMS encrypted reader
|
|
|
// Create SSE-KMS encrypted reader
|
|
|
|