Browse Source

fixes

pull/7481/head
chrislu 1 month ago
parent
commit
ec3f7f7fbf
  1. 59
      weed/s3api/filer_multipart.go
  2. 163
      weed/s3api/s3api_object_handlers.go
  3. 18
      weed/s3api/s3api_object_handlers_multipart.go
  4. 44
      weed/s3api/s3api_object_handlers_put.go
  5. 15
      weed/s3api/s3api_put_handlers.go

59
weed/s3api/filer_multipart.go

@ -251,49 +251,13 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
continue
}
// Track within-part offset for SSE-KMS IV calculation
var withinPartOffset int64 = 0
for _, chunk := range entry.GetChunks() {
// Update SSE metadata with correct within-part offset (unified approach for KMS and SSE-C)
sseKmsMetadata := chunk.SseMetadata
if chunk.SseType == filer_pb.SSEType_SSE_KMS && len(chunk.SseMetadata) > 0 {
// Deserialize, update offset, and re-serialize SSE-KMS metadata
if kmsKey, err := DeserializeSSEKMSMetadata(chunk.SseMetadata); err == nil {
kmsKey.ChunkOffset = withinPartOffset
if updatedMetadata, serErr := SerializeSSEKMSMetadata(kmsKey); serErr == nil {
sseKmsMetadata = updatedMetadata
glog.V(4).Infof("Updated SSE-KMS metadata for chunk in part %d: withinPartOffset=%d", partNumber, withinPartOffset)
}
}
} else if chunk.SseType == filer_pb.SSEType_SSE_C {
// For SSE-C chunks, create per-chunk metadata using the part's IV
if ivDataBase64, exists := entry.Extended[s3_constants.SeaweedFSSSEIV]; exists {
// Decode base64-encoded IV (stored IV is base64, but SerializeSSECMetadata expects raw bytes)
ivData, decodeErr := base64.StdEncoding.DecodeString(string(ivDataBase64))
if decodeErr != nil {
glog.Errorf("Failed to decode SSE-C IV for chunk in part %d: %v", partNumber, decodeErr)
} else {
// Get keyMD5 from entry metadata if available
var keyMD5 string
if keyMD5Data, keyExists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]; keyExists {
keyMD5 = string(keyMD5Data)
}
// Create SSE-C metadata with the part's IV and this chunk's within-part offset
if ssecMetadata, serErr := SerializeSSECMetadata(ivData, keyMD5, withinPartOffset); serErr == nil {
sseKmsMetadata = ssecMetadata // Reuse the same field for unified handling
glog.V(4).Infof("Created SSE-C metadata for chunk in part %d: withinPartOffset=%d", partNumber, withinPartOffset)
} else {
glog.Errorf("Failed to serialize SSE-C metadata for chunk in part %d: %v", partNumber, serErr)
}
}
} else {
glog.Errorf("SSE-C chunk in part %d missing IV in entry metadata", partNumber)
}
}
// CRITICAL: Do NOT modify SSE metadata offsets during assembly!
// The encrypted data was created with the offset stored in chunk.SseMetadata.
// Changing the offset here would cause decryption to fail because CTR mode
// uses the offset to initialize the counter. We must decrypt with the same
// offset that was used during encryption.
p := &filer_pb.FileChunk{
FileId: chunk.GetFileIdString(),
Offset: offset,
@ -302,13 +266,12 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
CipherKey: chunk.CipherKey,
ETag: chunk.ETag,
IsCompressed: chunk.IsCompressed,
// Preserve SSE metadata with updated within-part offset
// Preserve SSE metadata UNCHANGED - do not modify the offset!
SseType: chunk.SseType,
SseMetadata: sseKmsMetadata,
SseMetadata: chunk.SseMetadata,
}
finalParts = append(finalParts, p)
offset += int64(chunk.Size)
withinPartOffset += int64(chunk.Size)
}
found = true
}
@ -367,7 +330,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
s3_constants.AmzServerSideEncryptionCustomerAlgorithm,
s3_constants.AmzServerSideEncryptionCustomerKeyMD5,
// SSE-KMS headers
s3_constants.SeaweedFSSSEKMSKeyHeader,
s3_constants.SeaweedFSSSEKMSKey,
s3_constants.AmzServerSideEncryptionAwsKmsKeyId,
// SSE-S3 headers
s3_constants.SeaweedFSSSES3Key,
@ -448,7 +411,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
s3_constants.AmzServerSideEncryptionCustomerAlgorithm,
s3_constants.AmzServerSideEncryptionCustomerKeyMD5,
// SSE-KMS headers
s3_constants.SeaweedFSSSEKMSKeyHeader,
s3_constants.SeaweedFSSSEKMSKey,
s3_constants.AmzServerSideEncryptionAwsKmsKeyId,
// SSE-S3 headers
s3_constants.SeaweedFSSSES3Key,
@ -519,7 +482,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
s3_constants.AmzServerSideEncryptionCustomerAlgorithm,
s3_constants.AmzServerSideEncryptionCustomerKeyMD5,
// SSE-KMS headers
s3_constants.SeaweedFSSSEKMSKeyHeader,
s3_constants.SeaweedFSSSEKMSKey,
s3_constants.AmzServerSideEncryptionAwsKmsKeyId,
// SSE-S3 headers
s3_constants.SeaweedFSSSES3Key,

163
weed/s3api/s3api_object_handlers.go

@ -761,13 +761,12 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
}
decryptionKey = customerKey
case s3_constants.SSETypeKMS:
// Extract KMS key from metadata
// Extract KMS key from metadata (stored as raw bytes, matching filer behavior)
if entry.Extended == nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return fmt.Errorf("no SSE-KMS metadata")
}
kmsMetadataB64 := entry.Extended[s3_constants.SeaweedFSSSEKMSKeyHeader]
kmsMetadataBytes, _ := base64.StdEncoding.DecodeString(string(kmsMetadataB64))
kmsMetadataBytes := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]
sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
@ -775,13 +774,12 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
}
decryptionKey = sseKMSKey
case s3_constants.SSETypeS3:
// Extract S3 key from metadata
// Extract S3 key from metadata (stored as raw bytes, matching filer behavior)
if entry.Extended == nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return fmt.Errorf("no SSE-S3 metadata")
}
keyDataB64 := entry.Extended[s3_constants.SeaweedFSSSES3Key]
keyData, _ := base64.StdEncoding.DecodeString(string(keyDataB64))
keyData := entry.Extended[s3_constants.SeaweedFSSSES3Key]
keyManager := GetSSES3KeyManager()
sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager)
if err != nil {
@ -814,17 +812,37 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
switch sseType {
case s3_constants.SSETypeC:
customerKey := decryptionKey.(*SSECustomerKey)
// Use storage key (lowercase) not header key for reading from entry.Extended
ivBase64 := string(entry.Extended[s3_constants.SeaweedFSSSEIV])
if ivBase64 == "" {
return fmt.Errorf("SSE-C IV not found in entry metadata")
fmt.Printf("[GET DEBUG] SSE-C decryption - KeyMD5=%s, entry has %d chunks\n", customerKey.KeyMD5, len(entry.GetChunks()))
for i, chunk := range entry.GetChunks() {
fmt.Printf("[GET DEBUG] Chunk[%d]: offset=%d, size=%d, sseType=%v, hasMetadata=%v\n",
i, chunk.Offset, chunk.Size, chunk.SseType, len(chunk.SseMetadata) > 0)
}
// Check if this is a multipart object (multiple chunks with SSE-C metadata)
isMultipartSSEC := false
ssecChunks := 0
for _, chunk := range entry.GetChunks() {
if chunk.GetSseType() == filer_pb.SSEType_SSE_C && len(chunk.GetSseMetadata()) > 0 {
ssecChunks++
}
}
iv, ivErr := base64.StdEncoding.DecodeString(ivBase64)
if ivErr != nil {
return fmt.Errorf("failed to decode SSE-C IV: %w", ivErr)
isMultipartSSEC = ssecChunks > 1
fmt.Printf("[GET DEBUG] isMultipartSSEC=%v, ssecChunks=%d\n", isMultipartSSEC, ssecChunks)
if isMultipartSSEC {
// Handle multipart SSE-C objects - each chunk needs independent decryption with its own IV
decryptedReader, err = s3a.createMultipartSSECDecryptedReaderDirect(encryptedReader, customerKey, entry)
glog.V(2).Infof("Using multipart SSE-C decryption for object with %d chunks", len(entry.GetChunks()))
} else {
// Handle single-part SSE-C objects - use object-level IV
iv := entry.Extended[s3_constants.SeaweedFSSSEIV]
if len(iv) == 0 {
return fmt.Errorf("SSE-C IV not found in entry metadata")
}
glog.V(2).Infof("SSE-C decryption: IV length=%d, KeyMD5=%s", len(iv), customerKey.KeyMD5)
decryptedReader, err = CreateSSECDecryptedReader(encryptedReader, customerKey, iv)
}
glog.V(2).Infof("SSE-C decryption: IV length=%d, KeyMD5=%s", len(iv), customerKey.KeyMD5)
decryptedReader, err = CreateSSECDecryptedReader(encryptedReader, customerKey, iv)
case s3_constants.SSETypeKMS:
sseKMSKey := decryptionKey.(*SSEKMSKey)
glog.V(2).Infof("SSE-KMS decryption: KeyID=%s, IV length=%d", sseKMSKey.KeyID, len(sseKMSKey.IV))
@ -945,13 +963,10 @@ func (s3a *S3ApiServer) addSSEResponseHeadersFromEntry(w http.ResponseWriter, r
case s3_constants.SSETypeKMS:
// SSE-KMS: Return algorithm and key ID
w.Header().Set(s3_constants.AmzServerSideEncryption, "aws:kms")
if kmsMetadataB64, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKeyHeader]; exists {
kmsMetadataBytes, err := base64.StdEncoding.DecodeString(string(kmsMetadataB64))
if kmsMetadataBytes, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists {
sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes)
if err == nil {
sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes)
if err == nil {
AddSSEKMSResponseHeaders(w, sseKMSKey)
}
AddSSEKMSResponseHeaders(w, sseKMSKey)
}
}
@ -995,39 +1010,28 @@ func (s3a *S3ApiServer) createSSECDecryptedReaderFromEntry(r *http.Request, encr
}
}
// Get IV from entry metadata
// Use storage key (lowercase) not header key for reading from entry.Extended
ivBase64 := string(entry.Extended[s3_constants.SeaweedFSSSEIV])
if ivBase64 == "" {
// Get IV from entry metadata (stored as raw bytes, matching filer behavior)
iv := entry.Extended[s3_constants.SeaweedFSSSEIV]
if len(iv) == 0 {
return nil, fmt.Errorf("SSE-C IV not found in metadata")
}
iv, err := base64.StdEncoding.DecodeString(ivBase64)
if err != nil {
return nil, fmt.Errorf("failed to decode IV: %w", err)
}
// Create decrypted reader
return CreateSSECDecryptedReader(encryptedReader, customerKey, iv)
}
// createSSEKMSDecryptedReaderFromEntry creates an SSE-KMS decrypted reader from entry metadata
func (s3a *S3ApiServer) createSSEKMSDecryptedReaderFromEntry(r *http.Request, encryptedReader io.Reader, entry *filer_pb.Entry) (io.Reader, error) {
// Extract SSE-KMS metadata from entry
// Extract SSE-KMS metadata from entry (stored as raw bytes, matching filer behavior)
if entry.Extended == nil {
return nil, fmt.Errorf("no extended metadata found")
}
kmsMetadataB64, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKeyHeader]
kmsMetadataBytes, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]
if !exists {
return nil, fmt.Errorf("SSE-KMS metadata not found")
}
kmsMetadataBytes, err := base64.StdEncoding.DecodeString(string(kmsMetadataB64))
if err != nil {
return nil, fmt.Errorf("failed to decode SSE-KMS metadata: %w", err)
}
sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes)
if err != nil {
return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata: %w", err)
@ -1039,22 +1043,16 @@ func (s3a *S3ApiServer) createSSEKMSDecryptedReaderFromEntry(r *http.Request, en
// createSSES3DecryptedReaderFromEntry creates an SSE-S3 decrypted reader from entry metadata
func (s3a *S3ApiServer) createSSES3DecryptedReaderFromEntry(r *http.Request, encryptedReader io.Reader, entry *filer_pb.Entry) (io.Reader, error) {
// Extract SSE-S3 metadata from entry
// Extract SSE-S3 metadata from entry (stored as raw bytes, matching filer behavior)
if entry.Extended == nil {
return nil, fmt.Errorf("no extended metadata found")
}
keyDataB64, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key]
keyData, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key]
if !exists {
return nil, fmt.Errorf("SSE-S3 metadata not found")
}
// Decode from base64 (matches storage format)
keyData, err := base64.StdEncoding.DecodeString(string(keyDataB64))
if err != nil {
return nil, fmt.Errorf("failed to decode SSE-S3 metadata: %w", err)
}
keyManager := GetSSES3KeyManager()
sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager)
if err != nil {
@ -2162,6 +2160,81 @@ func (s3a *S3ApiServer) detectPrimarySSEType(entry *filer_pb.Entry) string {
return "None"
}
// createMultipartSSECDecryptedReaderDirect creates a reader that decrypts each chunk independently for multipart SSE-C objects (direct volume path)
func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(encryptedStream io.ReadCloser, customerKey *SSECustomerKey, entry *filer_pb.Entry) (io.Reader, error) {
// Sort chunks by offset to ensure correct order
chunks := entry.GetChunks()
sort.Slice(chunks, func(i, j int) bool {
return chunks[i].GetOffset() < chunks[j].GetOffset()
})
// Create readers for each chunk, decrypting them independently
var readers []io.Reader
for _, chunk := range chunks {
// Get this chunk's encrypted data
chunkReader, err := s3a.createEncryptedChunkReader(chunk)
if err != nil {
return nil, fmt.Errorf("failed to create chunk reader: %v", err)
}
// Handle based on chunk's encryption type
if chunk.GetSseType() == filer_pb.SSEType_SSE_C {
// Check if this chunk has per-chunk SSE-C metadata
if len(chunk.GetSseMetadata()) == 0 {
chunkReader.Close()
return nil, fmt.Errorf("SSE-C chunk %s missing per-chunk metadata", chunk.GetFileIdString())
}
// Deserialize the SSE-C metadata
ssecMetadata, err := DeserializeSSECMetadata(chunk.GetSseMetadata())
if err != nil {
chunkReader.Close()
return nil, fmt.Errorf("failed to deserialize SSE-C metadata for chunk %s: %v", chunk.GetFileIdString(), err)
}
// Decode the IV from the metadata
chunkIV, err := base64.StdEncoding.DecodeString(ssecMetadata.IV)
if err != nil {
chunkReader.Close()
return nil, fmt.Errorf("failed to decode IV for SSE-C chunk %s: %v", chunk.GetFileIdString(), err)
}
fmt.Printf("[GET DEBUG] Decrypting chunk %s with IV=%x, PartOffset=%d\n",
chunk.GetFileIdString(), chunkIV[:8], ssecMetadata.PartOffset)
// Note: For multipart SSE-C, each part was encrypted with offset=0
// So we don't need to adjust the IV with PartOffset - just use the stored IV directly
decryptedChunkReader, decErr := CreateSSECDecryptedReader(chunkReader, customerKey, chunkIV)
if decErr != nil {
chunkReader.Close()
return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr)
}
// Use the streaming decrypted reader directly
readers = append(readers, struct {
io.Reader
io.Closer
}{
Reader: decryptedChunkReader,
Closer: chunkReader,
})
glog.V(4).Infof("Added streaming decrypted reader for SSE-C chunk %s", chunk.GetFileIdString())
} else {
// Non-SSE-C chunk, use as-is
readers = append(readers, chunkReader)
glog.V(4).Infof("Added non-encrypted reader for chunk %s", chunk.GetFileIdString())
}
}
// Close the original encrypted stream since we're reading chunks individually
if encryptedStream != nil {
encryptedStream.Close()
}
return NewMultipartSSEReader(readers), nil
}
// createMultipartSSEKMSDecryptedReader creates a reader that decrypts each chunk independently for multipart SSE-KMS objects
func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) {
// Entry is passed from caller to avoid redundant filer lookup

18
weed/s3api/s3api_object_handlers_multipart.go

@ -308,6 +308,7 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
dataReader, s3ErrCode := getRequestDataReader(s3a, r)
if s3ErrCode != s3err.ErrNone {
glog.Errorf("PutObjectPartHandler: getRequestDataReader failed with code %v", s3ErrCode)
s3err.WriteErrorResponse(w, r, s3ErrCode)
return
}
@ -358,12 +359,16 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
}
if len(baseIV) == 0 {
fmt.Printf("[SSE-KMS] No valid base IV found for SSE-KMS multipart upload %s\n", uploadID)
glog.Errorf("No valid base IV found for SSE-KMS multipart upload %s", uploadID)
// Generate a new base IV as fallback
baseIV = make([]byte, 16)
if _, err := rand.Read(baseIV); err != nil {
fmt.Printf("[SSE-KMS] Failed to generate fallback base IV: %v\n", err)
glog.Errorf("Failed to generate fallback base IV: %v", err)
}
} else {
fmt.Printf("[SSE-KMS] Using base IV for multipart upload %s\n", uploadID)
}
// Add SSE-KMS headers to the request for putToFiler to handle encryption
@ -384,6 +389,7 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
} else {
// Check if this upload uses SSE-S3
if err := s3a.handleSSES3MultipartHeaders(r, uploadEntry, uploadID); err != nil {
fmt.Printf("[SSE-S3] Failed to setup SSE-S3 multipart headers: %v\n", err)
glog.Errorf("Failed to setup SSE-S3 multipart headers: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
@ -401,22 +407,24 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
}
destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
glog.V(2).Infof("PutObjectPart: bucket=%s, object=%s, uploadId=%s, partNumber=%d, size=%d",
glog.V(2).Infof("PutObjectPart: bucket=%s, object=%s, uploadId=%s, partNumber=%d, size=%d",
bucket, object, uploadID, partID, r.ContentLength)
fmt.Printf("[PutObjectPartHandler] About to call putToFiler - bucket=%s, object=%s, partID=%d\n", bucket, object, partID)
etag, errCode, sseType := s3a.putToFiler(r, uploadUrl, dataReader, destination, bucket, partID)
fmt.Printf("[PutObjectPartHandler] putToFiler returned - errCode=%v\n", errCode)
if errCode != s3err.ErrNone {
glog.Errorf("PutObjectPart: putToFiler failed with error code %v for bucket=%s, object=%s, partNumber=%d",
glog.Errorf("PutObjectPart: putToFiler failed with error code %v for bucket=%s, object=%s, partNumber=%d",
errCode, bucket, object, partID)
s3err.WriteErrorResponse(w, r, errCode)
return
}
glog.V(2).Infof("PutObjectPart: SUCCESS - bucket=%s, object=%s, partNumber=%d, etag=%s, sseType=%s",
glog.V(2).Infof("PutObjectPart: SUCCESS - bucket=%s, object=%s, partNumber=%d, etag=%s, sseType=%s",
bucket, object, partID, etag, sseType)
setEtag(w, etag)
// Set SSE response headers for multipart uploads
s3a.setSSEResponseHeaders(w, r, sseType)

44
weed/s3api/s3api_object_handlers_put.go

@ -232,16 +232,21 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, destination string, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseType string) {
// NEW OPTIMIZATION: Write directly to volume servers, bypassing filer proxy
// This eliminates the filer proxy overhead for PUT operations
fmt.Printf("[putToFiler] ENTRY - uploadUrl=%s, partNumber=%d\n", uploadUrl, partNumber)
// Calculate unique offset for each part to prevent IV reuse in multipart uploads
// This is critical for CTR mode encryption security
partOffset := calculatePartOffset(partNumber)
// For SSE, encrypt with offset=0 for all parts
// Each part is encrypted independently, then decrypted using metadata during GET
partOffset := int64(0)
// Handle all SSE encryption types in a unified manner to eliminate repetitive dataReader assignments
// Handle all SSE encryption types in a unified manner
fmt.Printf("[putToFiler] Calling handleAllSSEEncryption - partNumber=%d, partOffset=%d\n", partNumber, partOffset)
sseResult, sseErrorCode := s3a.handleAllSSEEncryption(r, dataReader, partOffset)
if sseErrorCode != s3err.ErrNone {
fmt.Printf("[putToFiler] handleAllSSEEncryption FAILED with error code %v\n", sseErrorCode)
return "", sseErrorCode, ""
}
fmt.Printf("[putToFiler] handleAllSSEEncryption SUCCESS - hasCustomerKey=%v, hasKMSKey=%v, hasS3Key=%v\n",
sseResult.CustomerKey != nil, sseResult.SSEKMSKey != nil, sseResult.SSES3Key != nil)
// Extract results from unified SSE handling
dataReader = sseResult.DataReader
@ -361,16 +366,17 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
return "", s3err.ErrInternalError, ""
}
glog.V(0).Infof("putToFiler: Uploading to volume server - fileId=%s", assignResult.FileId)
uploadResult, uploadErr := uploader.UploadData(context.Background(), data, uploadOption)
if uploadErr != nil {
glog.Errorf("putToFiler: failed to upload to volume server: %v", uploadErr)
glog.Errorf("putToFiler: failed to upload to volume server for %s: %v", filePath, uploadErr)
if strings.Contains(uploadErr.Error(), s3err.ErrMsgPayloadChecksumMismatch) {
return "", s3err.ErrInvalidDigest, ""
}
return "", s3err.ErrInternalError, ""
}
glog.V(3).Infof("putToFiler: Volume upload SUCCESS - fileId=%s, size=%d, md5(base64)=%s",
glog.V(0).Infof("putToFiler: Volume upload SUCCESS - fileId=%s, size=%d, md5(base64)=%s",
assignResult.FileId, uploadResult.Size, uploadResult.ContentMd5)
// Step 3: Create metadata entry
@ -496,44 +502,52 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
// Set SSE-C metadata
if customerKey != nil && len(sseIV) > 0 {
// Use helper function to store IV as base64 (matches filer behavior)
StoreSSECIVInMetadata(entry.Extended, sseIV)
// Store IV as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes)
entry.Extended[s3_constants.SeaweedFSSSEIV] = sseIV
entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte("AES256")
entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(customerKey.KeyMD5)
glog.V(0).Infof("putToFiler: storing SSE-C metadata - IV len=%d", len(sseIV))
}
// Set SSE-KMS metadata
if sseKMSKey != nil {
// Store metadata as base64 (matches filer behavior and response reading expectation)
entry.Extended[s3_constants.SeaweedFSSSEKMSKeyHeader] = []byte(base64.StdEncoding.EncodeToString(sseKMSMetadata))
// Store metadata as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes)
entry.Extended[s3_constants.SeaweedFSSSEKMSKey] = sseKMSMetadata
// Set standard SSE headers for detection
entry.Extended[s3_constants.AmzServerSideEncryption] = []byte("aws:kms")
entry.Extended[s3_constants.AmzServerSideEncryptionAwsKmsKeyId] = []byte(sseKMSKey.KeyID)
glog.V(3).Infof("putToFiler: storing SSE-KMS metadata for object %s with keyID %s", filePath, sseKMSKey.KeyID)
glog.V(0).Infof("putToFiler: storing SSE-KMS metadata - keyID=%s, raw len=%d", sseKMSKey.KeyID, len(sseKMSMetadata))
}
// Set SSE-S3 metadata
if sseS3Key != nil && len(sseS3Metadata) > 0 {
// Store metadata as base64 (matches filer behavior)
entry.Extended[s3_constants.SeaweedFSSSES3Key] = []byte(base64.StdEncoding.EncodeToString(sseS3Metadata))
// Store metadata as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes)
entry.Extended[s3_constants.SeaweedFSSSES3Key] = sseS3Metadata
// Set standard SSE header for detection
entry.Extended[s3_constants.AmzServerSideEncryption] = []byte("AES256")
glog.V(3).Infof("putToFiler: storing SSE-S3 metadata for object %s with keyID %s", filePath, sseS3Key.KeyID)
glog.V(0).Infof("putToFiler: storing SSE-S3 metadata - keyID=%s, raw len=%d", sseS3Key.KeyID, len(sseS3Metadata))
}
// Step 4: Save metadata to filer via gRPC
glog.V(0).Infof("putToFiler: About to create entry - dir=%s, name=%s, chunks=%d, extended keys=%d",
filepath.Dir(filePath), filepath.Base(filePath), len(entry.Chunks), len(entry.Extended))
createErr := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
req := &filer_pb.CreateEntryRequest{
Directory: filepath.Dir(filePath),
Entry: entry,
}
glog.V(0).Infof("putToFiler: Calling CreateEntry for %s", filePath)
_, err := client.CreateEntry(context.Background(), req)
if err != nil {
glog.Errorf("putToFiler: CreateEntry returned error: %v", err)
}
return err
})
if createErr != nil {
glog.Errorf("putToFiler: failed to create entry: %v", createErr)
glog.Errorf("putToFiler: failed to create entry for %s: %v", filePath, createErr)
return "", filerErrorToS3Error(createErr.Error()), ""
}
glog.V(0).Infof("putToFiler: CreateEntry SUCCESS for %s", filePath)
glog.V(2).Infof("putToFiler: Metadata saved SUCCESS - path=%s, etag(hex)=%s, size=%d, partNumber=%d",
filePath, etag, entry.Attributes.FileSize, partNumber)

15
weed/s3api/s3api_put_handlers.go

@ -2,6 +2,7 @@ package s3api
import (
"encoding/base64"
"fmt"
"io"
"net/http"
"strings"
@ -100,20 +101,32 @@ func (s3a *S3ApiServer) handleSSEKMSEncryption(r *http.Request, dataReader io.Re
if baseIVHeader != "" {
// Decode the base IV from the header
baseIV, decodeErr := base64.StdEncoding.DecodeString(baseIVHeader)
if decodeErr != nil || len(baseIV) != 16 {
if decodeErr != nil {
glog.Errorf("handleSSEKMSEncryption: Failed to decode base IV: %v", decodeErr)
fmt.Printf("[SSE-KMS DEBUG] Failed to decode base IV: %v\n", decodeErr)
return nil, nil, nil, s3err.ErrInternalError
}
if len(baseIV) != 16 {
glog.Errorf("handleSSEKMSEncryption: Invalid base IV length: %d (expected 16)", len(baseIV))
fmt.Printf("[SSE-KMS DEBUG] Invalid base IV length: %d\n", len(baseIV))
return nil, nil, nil, s3err.ErrInternalError
}
// Use the provided base IV with unique part offset for multipart upload consistency
fmt.Printf("[SSE-KMS DEBUG] Creating encrypted reader with baseIV=%x, partOffset=%d\n", baseIV[:8], partOffset)
encryptedReader, sseKey, encErr = CreateSSEKMSEncryptedReaderWithBaseIVAndOffset(dataReader, keyID, encryptionContext, bucketKeyEnabled, baseIV, partOffset)
glog.V(4).Infof("Using provided base IV %x for SSE-KMS encryption", baseIV[:8])
} else {
// Generate a new IV for single-part uploads
fmt.Printf("[SSE-KMS DEBUG] Creating encrypted reader for single-part (no base IV)\n")
encryptedReader, sseKey, encErr = CreateSSEKMSEncryptedReaderWithBucketKey(dataReader, keyID, encryptionContext, bucketKeyEnabled)
}
if encErr != nil {
glog.Errorf("handleSSEKMSEncryption: Encryption failed: %v", encErr)
fmt.Printf("[SSE-KMS DEBUG] Encryption failed: %v\n", encErr)
return nil, nil, nil, s3err.ErrInternalError
}
fmt.Printf("[SSE-KMS DEBUG] Encryption successful, keyID=%s\n", keyID)
// Prepare SSE-KMS metadata for later header setting
sseKMSMetadata, metaErr := SerializeSSEKMSMetadata(sseKey)

Loading…
Cancel
Save