Browse Source

directly read write volume servers

pull/7481/head
chrislu 1 month ago
parent
commit
7ef058d89a
  1. 186
      weed/s3api/s3api_object_handlers.go
  2. 5
      weed/s3api/s3api_object_handlers_multipart.go
  3. 297
      weed/s3api/s3api_object_handlers_put.go

186
weed/s3api/s3api_object_handlers.go

@ -2,12 +2,15 @@ package s3api
import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"mime"
"net/http"
"net/url"
"path/filepath"
"sort"
"strconv"
"strings"
@ -15,6 +18,8 @@ import (
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
@ -409,31 +414,178 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
}
}
// NEW OPTIMIZATION: Stream directly from volume servers, bypassing filer proxy
// This eliminates the 19ms filer proxy overhead
// Check if this is an SSE object for Range request handling
// This applies to both versioned and non-versioned objects
if originalRangeHeader != "" && objectEntryForSSE != nil {
primarySSEType := s3a.detectPrimarySSEType(objectEntryForSSE)
if primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS {
sseObject = true
// Temporarily remove Range header to get full encrypted data from filer
r.Header.Del("Range")
primarySSEType := s3a.detectPrimarySSEType(objectEntryForSSE)
if originalRangeHeader != "" && (primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS) {
sseObject = true
// Temporarily remove Range header to get full encrypted data
r.Header.Del("Range")
}
// Add SSE response headers before streaming
if objectEntryForSSE != nil {
// Create a fake response to get SSE headers
fakeResp := &http.Response{Header: make(http.Header)}
s3a.addSSEHeadersToResponse(fakeResp, objectEntryForSSE)
// Copy SSE headers to actual response
for k, v := range fakeResp.Header {
if strings.HasPrefix(k, "X-Amz-Server-Side-Encryption") {
w.Header()[k] = v
}
}
}
s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
// Restore the original Range header for SSE processing
if sseObject && originalRangeHeader != "" {
r.Header.Set("Range", originalRangeHeader)
// Restore the original Range header for SSE processing
if sseObject && originalRangeHeader != "" {
r.Header.Set("Range", originalRangeHeader)
}
// Stream directly from volume servers
err = s3a.streamFromVolumeServers(w, r, objectEntryForSSE, primarySSEType)
if err != nil {
glog.Errorf("GetObjectHandler: failed to stream from volume servers: %v", err)
// Don't write error response - headers already sent
return
}
}
// streamFromVolumeServers streams object data directly from volume servers, bypassing filer proxy
// This eliminates the ~19ms filer proxy overhead by reading chunks directly
func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) error {
if entry == nil {
return fmt.Errorf("entry is nil")
}
// Get file size
totalSize := int64(filer.FileSize(entry))
// Set standard HTTP headers from entry metadata
s3a.setResponseHeaders(w, entry, totalSize)
// For small files stored inline in entry.Content
if len(entry.Content) > 0 && totalSize == int64(len(entry.Content)) {
_, err := w.Write(entry.Content)
return err
}
// Get chunks
chunks := entry.GetChunks()
if len(chunks) == 0 {
w.WriteHeader(http.StatusOK)
return nil
}
// Create lookup function via filer client
ctx := r.Context()
lookupFileIdFn := func(ctx context.Context, fileId string) ([]string, error) {
var urls []string
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
vid := filer.VolumeId(fileId)
resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
})
if err != nil {
return err
}
if locs, found := resp.LocationsMap[vid]; found {
for _, loc := range locs.Locations {
urls = append(urls, "http://"+loc.Url+"/"+fileId)
}
}
return nil
})
return urls, err
}
// Resolve chunk manifests
resolvedChunks, _, err := filer.ResolveChunkManifest(ctx, lookupFileIdFn, chunks, 0, totalSize)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return fmt.Errorf("failed to resolve chunks: %v", err)
}
// Prepare streaming function with simple master client wrapper
masterClient := &simpleMasterClient{lookupFn: lookupFileIdFn}
streamFn, err := filer.PrepareStreamContentWithThrottler(
ctx,
masterClient,
func(fileId string) string {
// Use read signing key for volume server auth
return string(security.GenJwtForFilerServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec))
},
resolvedChunks,
0,
totalSize,
0, // no throttling
)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return fmt.Errorf("failed to prepare stream: %v", err)
}
// Stream directly to response
return streamFn(w)
}
// setResponseHeaders sets all standard HTTP response headers from entry metadata
func (s3a *S3ApiServer) setResponseHeaders(w http.ResponseWriter, entry *filer_pb.Entry, totalSize int64) {
// Set content length and accept ranges
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
w.Header().Set("Accept-Ranges", "bytes")
// Set ETag
etag := filer.ETag(entry)
if etag != "" {
w.Header().Set("ETag", "\""+etag+"\"")
}
// Set Last-Modified in RFC1123 format
if entry.Attributes != nil {
modTime := time.Unix(entry.Attributes.Mtime, 0).UTC()
w.Header().Set("Last-Modified", modTime.Format(http.TimeFormat))
}
// Set Content-Type
mimeType := ""
if entry.Attributes != nil && entry.Attributes.Mime != "" {
mimeType = entry.Attributes.Mime
}
if mimeType == "" {
// Try to detect from entry name
if entry.Name != "" {
ext := filepath.Ext(entry.Name)
if ext != "" {
mimeType = mime.TypeByExtension(ext)
}
}
}
if mimeType != "" {
w.Header().Set("Content-Type", mimeType)
} else {
w.Header().Set("Content-Type", "application/octet-stream")
}
// Add SSE metadata headers based on object metadata before SSE processing
if objectEntryForSSE != nil {
s3a.addSSEHeadersToResponse(proxyResponse, objectEntryForSSE)
// Set custom headers from entry.Extended (user metadata)
if entry.Extended != nil {
for k, v := range entry.Extended {
// Skip internal SeaweedFS headers
if !strings.HasPrefix(k, "xattr-") && !s3_constants.IsSeaweedFSInternalHeader(k) {
w.Header().Set(k, string(v))
}
}
}
}
// Handle SSE decryption (both SSE-C and SSE-KMS) if needed
return s3a.handleSSEResponse(r, proxyResponse, w, objectEntryForSSE)
})
// simpleMasterClient implements the minimal interface for streaming
type simpleMasterClient struct {
lookupFn func(ctx context.Context, fileId string) ([]string, error)
}
func (s *simpleMasterClient) GetLookupFileIdFunction() wdclient.LookupFileIdFunctionType {
return s.lookupFn
}
func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {

5
weed/s3api/s3api_object_handlers_multipart.go

@ -401,13 +401,16 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
}
destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
etag, errCode, _ := s3a.putToFiler(r, uploadUrl, dataReader, destination, bucket, partID)
etag, errCode, sseType := s3a.putToFiler(r, uploadUrl, dataReader, destination, bucket, partID)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
setEtag(w, etag)
// Set SSE response headers for multipart uploads
s3a.setSSEResponseHeaders(w, r, sseType)
writeSuccessResponseEmpty(w, r)

297
weed/s3api/s3api_object_handlers_put.go

@ -1,25 +1,25 @@
package s3api
import (
"context"
"crypto/md5"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/pquerna/cachecontrol/cacheobject"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/security"
weed_server "github.com/seaweedfs/seaweedfs/weed/server"
stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util/constants"
)
@ -159,7 +159,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
case s3_constants.VersioningEnabled:
// Handle enabled versioning - create new versions with real version IDs
glog.V(0).Infof("PutObjectHandler: ENABLED versioning detected for %s/%s, calling putVersionedObject", bucket, object)
versionId, etag, errCode := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType)
versionId, etag, errCode, sseType := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType)
if errCode != s3err.ErrNone {
glog.Errorf("PutObjectHandler: putVersionedObject failed with errCode=%v for %s/%s", errCode, bucket, object)
s3err.WriteErrorResponse(w, r, errCode)
@ -178,9 +178,13 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
// Set ETag in response
setEtag(w, etag)
// Set SSE response headers for versioned objects
s3a.setSSEResponseHeaders(w, r, sseType)
case s3_constants.VersioningSuspended:
// Handle suspended versioning - overwrite with "null" version ID but preserve existing versions
etag, errCode := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType)
etag, errCode, sseType := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
@ -191,6 +195,9 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
// Set ETag in response
setEtag(w, etag)
// Set SSE response headers for suspended versioning
s3a.setSSEResponseHeaders(w, r, sseType)
default:
// Handle regular PUT (never configured versioning)
uploadUrl := s3a.toFilerUrl(bucket, object)
@ -209,9 +216,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
setEtag(w, etag)
// Set SSE response headers based on encryption type used
if sseType == s3_constants.SSETypeS3 {
w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256)
}
s3a.setSSEResponseHeaders(w, r, sseType)
}
}
stats_collect.RecordBucketActiveTime(bucket)
@ -221,6 +226,9 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
}
func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, destination string, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseType string) {
// NEW OPTIMIZATION: Write directly to volume servers, bypassing filer proxy
// This eliminates the filer proxy overhead for PUT operations
// Calculate unique offset for each part to prevent IV reuse in multipart uploads
// This is critical for CTR mode encryption security
partOffset := calculatePartOffset(partNumber)
@ -270,105 +278,184 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
glog.V(4).Infof("putToFiler: explicit encryption already applied, skipping bucket default encryption")
}
// Parse the upload URL to extract the file path
// uploadUrl format: http://filer:8888/path/to/bucket/object
filePath := strings.TrimPrefix(uploadUrl, "http://"+string(s3a.option.Filer))
// Calculate MD5 hash
hash := md5.New()
var body = io.TeeReader(dataReader, hash)
proxyReq, err := http.NewRequest(http.MethodPut, uploadUrl, body)
// Step 1: Assign volume from filer (via gRPC)
var assignResult *filer_pb.AssignVolumeResponse
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
collection := ""
if s3a.option.FilerGroup != "" {
collection = s3a.getCollectionName(bucket)
}
resp, err := client.AssignVolume(context.Background(), &filer_pb.AssignVolumeRequest{
Count: 1,
Replication: "",
Collection: collection,
DiskType: "",
DataCenter: s3a.option.DataCenter,
Path: filePath,
})
if err != nil {
return fmt.Errorf("assign volume: %w", err)
}
if resp.Error != "" {
return fmt.Errorf("assign volume: %v", resp.Error)
}
assignResult = resp
return nil
})
if err != nil {
glog.Errorf("NewRequest %s: %v", uploadUrl, err)
glog.Errorf("putToFiler: failed to assign volume: %v", err)
return "", s3err.ErrInternalError, ""
}
proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
if destination != "" {
proxyReq.Header.Set(s3_constants.SeaweedStorageDestinationHeader, destination)
// Step 2: Upload data directly to volume server
volumeUploadUrl := fmt.Sprintf("http://%s/%s", assignResult.Location.Url, assignResult.FileId)
// Read all data for upload (we need to calculate hash anyway)
data, readErr := io.ReadAll(body)
if readErr != nil {
glog.Errorf("putToFiler: failed to read data: %v", readErr)
return "", s3err.ErrInternalError, ""
}
if s3a.option.FilerGroup != "" {
query := proxyReq.URL.Query()
query.Add("collection", s3a.getCollectionName(bucket))
proxyReq.URL.RawQuery = query.Encode()
// Calculate ETag for S3 API response (hex format)
etag = fmt.Sprintf("%x", hash.Sum(nil))
uploadOption := &operation.UploadOption{
UploadUrl: volumeUploadUrl,
Cipher: false,
IsInputCompressed: false,
MimeType: r.Header.Get("Content-Type"),
PairMap: nil,
Jwt: security.EncodedJwt(assignResult.Auth),
}
uploader, uploaderErr := operation.NewUploader()
if uploaderErr != nil {
glog.Errorf("putToFiler: failed to create uploader: %v", uploaderErr)
return "", s3err.ErrInternalError, ""
}
for header, values := range r.Header {
for _, value := range values {
proxyReq.Header.Add(header, value)
uploadResult, uploadErr := uploader.UploadData(context.Background(), data, uploadOption)
if uploadErr != nil {
glog.Errorf("putToFiler: failed to upload to volume server: %v", uploadErr)
if strings.Contains(uploadErr.Error(), s3err.ErrMsgPayloadChecksumMismatch) {
return "", s3err.ErrInvalidDigest, ""
}
return "", s3err.ErrInternalError, ""
}
// Log version ID header for debugging
if versionIdHeader := proxyReq.Header.Get(s3_constants.ExtVersionIdKey); versionIdHeader != "" {
glog.V(0).Infof("putToFiler: version ID header set: %s=%s for %s", s3_constants.ExtVersionIdKey, versionIdHeader, uploadUrl)
// Step 3: Create metadata entry
now := time.Now()
mimeType := r.Header.Get("Content-Type")
if mimeType == "" {
mimeType = "application/octet-stream"
}
// Set object owner header for filer to extract
// Create file chunk
fid, fidErr := filer_pb.ToFileIdObject(assignResult.FileId)
if fidErr != nil {
glog.Errorf("putToFiler: failed to parse file ID: %v", fidErr)
return "", s3err.ErrInternalError, ""
}
// IMPORTANT: FileChunk.ETag must be base64-encoded (from uploadResult.ContentMd5)
// NOT hex-encoded etag! The filer.ETagChunks() function expects base64.
fileChunk := &filer_pb.FileChunk{
FileId: assignResult.FileId,
Offset: 0,
Size: uint64(uploadResult.Size),
ETag: uploadResult.ContentMd5, // Base64-encoded MD5 from volume server
Fid: fid,
CipherKey: uploadResult.CipherKey,
}
// Create entry
entry := &filer_pb.Entry{
Name: filepath.Base(filePath),
IsDirectory: false,
Attributes: &filer_pb.FuseAttributes{
Crtime: now.Unix(),
Mtime: now.Unix(),
FileMode: 0660,
Uid: 0,
Gid: 0,
Mime: mimeType,
FileSize: uint64(uploadResult.Size),
},
Chunks: []*filer_pb.FileChunk{fileChunk},
Extended: make(map[string][]byte),
}
// Set object owner
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
proxyReq.Header.Set(s3_constants.ExtAmzOwnerKey, amzAccountId)
glog.V(2).Infof("putToFiler: setting owner header %s for object %s", amzAccountId, uploadUrl)
entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
glog.V(2).Infof("putToFiler: setting owner %s for object %s", amzAccountId, filePath)
}
// Set SSE-C metadata headers for the filer if encryption was applied
if customerKey != nil && len(sseIV) > 0 {
proxyReq.Header.Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, "AES256")
proxyReq.Header.Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, customerKey.KeyMD5)
// Store IV in a custom header that the filer can use to store in entry metadata
proxyReq.Header.Set(s3_constants.SeaweedFSSSEIVHeader, base64.StdEncoding.EncodeToString(sseIV))
// Set version ID if present
if versionIdHeader := r.Header.Get(s3_constants.ExtVersionIdKey); versionIdHeader != "" {
entry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionIdHeader)
glog.V(0).Infof("putToFiler: setting version ID %s for object %s", versionIdHeader, filePath)
}
// Set SSE-KMS metadata headers for the filer if KMS encryption was applied
if sseKMSKey != nil {
// Use already-serialized SSE-KMS metadata from helper function
// Store serialized KMS metadata in a custom header that the filer can use
proxyReq.Header.Set(s3_constants.SeaweedFSSSEKMSKeyHeader, base64.StdEncoding.EncodeToString(sseKMSMetadata))
// Set TTL-based S3 expiry
entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true")
glog.V(3).Infof("putToFiler: storing SSE-KMS metadata for object %s with keyID %s", uploadUrl, sseKMSKey.KeyID)
} else {
glog.V(4).Infof("putToFiler: no SSE-KMS encryption detected")
}
// Set SSE-S3 metadata headers for the filer if S3 encryption was applied
if sseS3Key != nil && len(sseS3Metadata) > 0 {
// Store serialized S3 metadata in a custom header that the filer can use
proxyReq.Header.Set(s3_constants.SeaweedFSSSES3Key, base64.StdEncoding.EncodeToString(sseS3Metadata))
glog.V(3).Infof("putToFiler: storing SSE-S3 metadata for object %s with keyID %s", uploadUrl, sseS3Key.KeyID)
}
// Set TTL-based S3 expiry (modification time)
proxyReq.Header.Set(s3_constants.SeaweedFSExpiresS3, "true")
// ensure that the Authorization header is overriding any previous
// Authorization header which might be already present in proxyReq
s3a.maybeAddFilerJwtAuthorization(proxyReq, true)
resp, postErr := s3a.client.Do(proxyReq)
if postErr != nil {
glog.Errorf("post to filer: %v", postErr)
if strings.Contains(postErr.Error(), s3err.ErrMsgPayloadChecksumMismatch) {
return "", s3err.ErrInvalidDigest, ""
// Copy user metadata and standard headers
for k, v := range r.Header {
if len(v) > 0 && len(v[0]) > 0 {
if strings.HasPrefix(k, "X-Amz-Meta-") || k == "Cache-Control" || k == "Expires" || k == "Content-Disposition" {
entry.Extended[k] = []byte(v[0])
}
if k == "Response-Content-Disposition" {
entry.Extended["Content-Disposition"] = []byte(v[0])
}
}
return "", s3err.ErrInternalError, ""
}
defer resp.Body.Close()
etag = fmt.Sprintf("%x", hash.Sum(nil))
// Set SSE-C metadata
if customerKey != nil && len(sseIV) > 0 {
entry.Extended[s3_constants.SeaweedFSSSEIV] = sseIV
entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte("AES256")
entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(customerKey.KeyMD5)
}
resp_body, ra_err := io.ReadAll(resp.Body)
if ra_err != nil {
glog.Errorf("upload to filer response read %d: %v", resp.StatusCode, ra_err)
return etag, s3err.ErrInternalError, ""
// Set SSE-KMS metadata
if sseKMSKey != nil {
entry.Extended[s3_constants.SeaweedFSSSEKMSKeyHeader] = sseKMSMetadata
glog.V(3).Infof("putToFiler: storing SSE-KMS metadata for object %s with keyID %s", filePath, sseKMSKey.KeyID)
}
var ret weed_server.FilerPostResult
unmarshal_err := json.Unmarshal(resp_body, &ret)
if unmarshal_err != nil {
glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body))
return "", s3err.ErrInternalError, ""
// Set SSE-S3 metadata
if sseS3Key != nil && len(sseS3Metadata) > 0 {
entry.Extended[s3_constants.SeaweedFSSSES3Key] = sseS3Metadata
glog.V(3).Infof("putToFiler: storing SSE-S3 metadata for object %s with keyID %s", filePath, sseS3Key.KeyID)
}
if ret.Error != "" {
glog.Errorf("upload to filer error: %v", ret.Error)
return "", filerErrorToS3Error(ret.Error), ""
// Step 4: Save metadata to filer via gRPC
createErr := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
req := &filer_pb.CreateEntryRequest{
Directory: filepath.Dir(filePath),
Entry: entry,
}
_, err := client.CreateEntry(context.Background(), req)
return err
})
if createErr != nil {
glog.Errorf("putToFiler: failed to create entry: %v", createErr)
return "", filerErrorToS3Error(createErr.Error()), ""
}
BucketTrafficReceived(ret.Size, r)
BucketTrafficReceived(int64(uploadResult.Size), r)
// Return the SSE type determined by the unified handler
return etag, s3err.ErrNone, sseResult.SSEType
@ -384,6 +471,34 @@ func setEtag(w http.ResponseWriter, etag string) {
}
}
// setSSEResponseHeaders sets appropriate SSE response headers based on encryption type
func (s3a *S3ApiServer) setSSEResponseHeaders(w http.ResponseWriter, r *http.Request, sseType string) {
switch sseType {
case s3_constants.SSETypeS3:
// SSE-S3: Return the encryption algorithm
w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256)
case s3_constants.SSETypeC:
// SSE-C: Echo back the customer-provided algorithm and key MD5
if algo := r.Header.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm); algo != "" {
w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, algo)
}
if keyMD5 := r.Header.Get(s3_constants.AmzServerSideEncryptionCustomerKeyMD5); keyMD5 != "" {
w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, keyMD5)
}
case s3_constants.SSETypeKMS:
// SSE-KMS: Return the KMS key ID and algorithm
w.Header().Set(s3_constants.AmzServerSideEncryption, "aws:kms")
if keyID := r.Header.Get(s3_constants.AmzServerSideEncryptionAwsKmsKeyId); keyID != "" {
w.Header().Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, keyID)
}
if bucketKeyEnabled := r.Header.Get(s3_constants.AmzServerSideEncryptionBucketKeyEnabled); bucketKeyEnabled == "true" {
w.Header().Set(s3_constants.AmzServerSideEncryptionBucketKeyEnabled, "true")
}
}
}
func filerErrorToS3Error(errString string) s3err.ErrorCode {
switch {
case errString == constants.ErrMsgBadDigest:
@ -446,7 +561,7 @@ func (s3a *S3ApiServer) setObjectOwnerFromRequest(r *http.Request, entry *filer_
//
// For suspended versioning, objects are stored as regular files (version ID "null") in the bucket directory,
// while existing versions from when versioning was enabled remain preserved in the .versions subdirectory.
func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode) {
func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode, sseType string) {
// Normalize object path to ensure consistency with toFilerUrl behavior
normalizedObject := removeDuplicateSlashes(object)
@ -528,7 +643,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
parsedTime, err := time.Parse(time.RFC3339, explicitRetainUntilDate)
if err != nil {
glog.Errorf("putSuspendedVersioningObject: failed to parse retention until date: %v", err)
return "", s3err.ErrInvalidRequest
return "", s3err.ErrInvalidRequest, ""
}
r.Header.Set(s3_constants.ExtRetentionUntilDateKey, strconv.FormatInt(parsedTime.Unix(), 10))
glog.V(2).Infof("putSuspendedVersioningObject: setting retention until date header (timestamp: %d)", parsedTime.Unix())
@ -540,7 +655,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
glog.V(2).Infof("putSuspendedVersioningObject: setting legal hold header: %s", legalHold)
} else {
glog.Errorf("putSuspendedVersioningObject: invalid legal hold value: %s", legalHold)
return "", s3err.ErrInvalidRequest
return "", s3err.ErrInvalidRequest, ""
}
}
@ -565,10 +680,10 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
if isTestObj {
glog.V(0).Infof("=== TESTOBJBAR: calling putToFiler ===")
}
etag, errCode, _ = s3a.putToFiler(r, uploadUrl, body, "", bucket, 1)
etag, errCode, sseType = s3a.putToFiler(r, uploadUrl, body, "", bucket, 1)
if errCode != s3err.ErrNone {
glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode)
return "", errCode
return "", errCode, ""
}
if isTestObj {
glog.V(0).Infof("=== TESTOBJBAR: putToFiler completed, etag=%s ===", etag)
@ -612,7 +727,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
if isTestObj {
glog.V(0).Infof("=== TESTOBJBAR: putSuspendedVersioningObject COMPLETED ===")
}
return etag, s3err.ErrNone
return etag, s3err.ErrNone, sseType
}
// updateIsLatestFlagsForSuspendedVersioning sets IsLatest=false on all existing versions/delete markers
@ -684,7 +799,7 @@ func (s3a *S3ApiServer) updateIsLatestFlagsForSuspendedVersioning(bucket, object
return nil
}
func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode) {
func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode, sseType string) {
// Generate version ID
versionId = generateVersionId()
@ -709,7 +824,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
})
if err != nil {
glog.Errorf("putVersionedObject: failed to create .versions directory: %v", err)
return "", "", s3err.ErrInternalError
return "", "", s3err.ErrInternalError, ""
}
hash := md5.New()
@ -720,10 +835,10 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionUploadUrl)
etag, errCode, _ = s3a.putToFiler(r, versionUploadUrl, body, "", bucket, 1)
etag, errCode, sseType = s3a.putToFiler(r, versionUploadUrl, body, "", bucket, 1)
if errCode != s3err.ErrNone {
glog.Errorf("putVersionedObject: failed to upload version: %v", errCode)
return "", "", errCode
return "", "", errCode, ""
}
// Get the uploaded entry to add versioning metadata
@ -745,7 +860,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
if err != nil {
glog.Errorf("putVersionedObject: failed to get version entry after %d attempts: %v", maxRetries, err)
return "", "", s3err.ErrInternalError
return "", "", s3err.ErrInternalError, ""
}
// Add versioning metadata to this version
@ -766,7 +881,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
// Extract and store object lock metadata from request headers
if err := s3a.extractObjectLockMetadataFromRequest(r, versionEntry); err != nil {
glog.Errorf("putVersionedObject: failed to extract object lock metadata: %v", err)
return "", "", s3err.ErrInvalidRequest
return "", "", s3err.ErrInvalidRequest, ""
}
// Update the version entry with metadata
@ -777,17 +892,17 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
})
if err != nil {
glog.Errorf("putVersionedObject: failed to update version metadata: %v", err)
return "", "", s3err.ErrInternalError
return "", "", s3err.ErrInternalError, ""
}
// Update the .versions directory metadata to indicate this is the latest version
err = s3a.updateLatestVersionInDirectory(bucket, normalizedObject, versionId, versionFileName)
if err != nil {
glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err)
return "", "", s3err.ErrInternalError
return "", "", s3err.ErrInternalError, ""
}
glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s (normalized: %s)", versionId, bucket, object, normalizedObject)
return versionId, etag, s3err.ErrNone
return versionId, etag, s3err.ErrNone, sseType
}
// updateLatestVersionInDirectory updates the .versions directory metadata to indicate the latest version

Loading…
Cancel
Save