Browse Source

minor fixes

pull/7481/head
chrislu 3 weeks ago
parent
commit
3f21f8b22f
  1. 25
      weed/operation/upload_chunked.go
  2. 113
      weed/s3api/s3api_object_handlers.go
  3. 2
      weed/s3api/s3api_object_handlers_list.go

25
weed/operation/upload_chunked.go

@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"hash" "hash"
"io" "io"
"sort"
"sync" "sync"
"time" "time"
@ -73,6 +74,19 @@ func UploadReaderInChunks(ctx context.Context, reader io.Reader, opt *ChunkedUpl
} }
uploadErrLock.Unlock() uploadErrLock.Unlock()
// Check for context cancellation
select {
case <-ctx.Done():
<-bytesBufferLimitChan
uploadErrLock.Lock()
if uploadErr == nil {
uploadErr = ctx.Err()
}
uploadErrLock.Unlock()
break
default:
}
// Get buffer from pool // Get buffer from pool
bytesBuffer := chunkBufferPool.Get().(*bytes.Buffer) bytesBuffer := chunkBufferPool.Get().(*bytes.Buffer)
limitedReader := io.LimitReader(partReader, int64(opt.ChunkSize)) limitedReader := io.LimitReader(partReader, int64(opt.ChunkSize))
@ -212,14 +226,9 @@ func UploadReaderInChunks(ctx context.Context, reader io.Reader, opt *ChunkedUpl
} }
// Sort chunks by offset // Sort chunks by offset
// Note: We could use slices.SortFunc here, but keeping it simple for Go 1.20 compatibility
for i := 0; i < len(fileChunks); i++ {
for j := i + 1; j < len(fileChunks); j++ {
if fileChunks[i].Offset > fileChunks[j].Offset {
fileChunks[i], fileChunks[j] = fileChunks[j], fileChunks[i]
}
}
}
sort.Slice(fileChunks, func(i, j int) bool {
return fileChunks[i].Offset < fileChunks[j].Offset
})
return &ChunkedUploadResult{ return &ChunkedUploadResult{
FileChunks: fileChunks, FileChunks: fileChunks,

113
weed/s3api/s3api_object_handlers.go

@ -706,7 +706,7 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
start := int(offset) start := int(offset)
end := start + int(size) end := start + int(size)
// Bounds check (should already be validated, but double-check) // Bounds check (should already be validated, but double-check)
if start < 0 || end > len(entry.Content) || end < start {
if start < 0 || start > len(entry.Content) || end > len(entry.Content) || end < start {
return fmt.Errorf("invalid range for inline content: start=%d, end=%d, len=%d", start, end, len(entry.Content)) return fmt.Errorf("invalid range for inline content: start=%d, end=%d, len=%d", start, end, len(entry.Content))
} }
_, err := w.Write(entry.Content[start:end]) _, err := w.Write(entry.Content[start:end])
@ -1267,43 +1267,32 @@ func (s3a *S3ApiServer) decryptSSECChunkView(ctx context.Context, fileChunk *fil
return nil, fmt.Errorf("failed to decode IV: %w", err) return nil, fmt.Errorf("failed to decode IV: %w", err)
} }
// Fetch FULL encrypted chunk
// Note: Fetching full chunk is necessary for proper CTR decryption stream
fullChunkReader, err := s3a.fetchFullChunk(ctx, chunkView.FileId)
// Fetch only the needed encrypted bytes for this view
encryptedReader, err := s3a.fetchChunkViewData(ctx, chunkView)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to fetch full chunk: %w", err)
return nil, fmt.Errorf("failed to fetch chunk view: %w", err)
} }
// Calculate IV using PartOffset
// PartOffset is the position of this chunk within its part's encrypted stream
// Calculate IV using both PartOffset and OffsetInChunk for CTR seek
// CTR mode allows seeking by adjusting the IV to any byte offset
absoluteOffset := ssecMetadata.PartOffset + chunkView.OffsetInChunk
var adjustedIV []byte var adjustedIV []byte
if ssecMetadata.PartOffset > 0 {
adjustedIV = adjustCTRIV(chunkIV, ssecMetadata.PartOffset)
if absoluteOffset > 0 {
adjustedIV = adjustCTRIV(chunkIV, absoluteOffset)
} else { } else {
adjustedIV = chunkIV adjustedIV = chunkIV
} }
// Decrypt the full chunk
decryptedReader, decryptErr := CreateSSECDecryptedReader(fullChunkReader, customerKey, adjustedIV)
// Decrypt from the correct offset
decryptedReader, decryptErr := CreateSSECDecryptedReader(encryptedReader, customerKey, adjustedIV)
if decryptErr != nil { if decryptErr != nil {
fullChunkReader.Close()
encryptedReader.Close()
return nil, fmt.Errorf("failed to create decrypted reader: %w", decryptErr) return nil, fmt.Errorf("failed to create decrypted reader: %w", decryptErr)
} }
// Skip to the position we need in the decrypted stream
if chunkView.OffsetInChunk > 0 {
_, err = io.CopyN(io.Discard, decryptedReader, chunkView.OffsetInChunk)
if err != nil {
if closer, ok := decryptedReader.(io.Closer); ok {
closer.Close()
}
return nil, fmt.Errorf("failed to skip to offset %d: %w", chunkView.OffsetInChunk, err)
}
}
// Return a reader that only reads ViewSize bytes with proper cleanup
// Return a reader that only reads ViewSize bytes
limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize)) limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize))
return &rc{Reader: limitedReader, Closer: fullChunkReader}, nil
return &rc{Reader: limitedReader, Closer: encryptedReader}, nil
} }
// Single-part SSE-C: use object-level IV (should not hit this in range path, but handle it) // Single-part SSE-C: use object-level IV (should not hit this in range path, but handle it)
@ -1323,16 +1312,18 @@ func (s3a *S3ApiServer) decryptSSEKMSChunkView(ctx context.Context, fileChunk *f
return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata: %w", err) return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata: %w", err)
} }
// Fetch FULL encrypted chunk
fullChunkReader, err := s3a.fetchFullChunk(ctx, chunkView.FileId)
// Fetch only the needed encrypted bytes for this view
encryptedReader, err := s3a.fetchChunkViewData(ctx, chunkView)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to fetch full chunk: %w", err)
return nil, fmt.Errorf("failed to fetch chunk view: %w", err)
} }
// Calculate IV using ChunkOffset (same as PartOffset in SSE-C)
// Calculate IV using both ChunkOffset and OffsetInChunk for CTR seek
// CTR mode allows seeking by adjusting the IV to any byte offset
absoluteOffset := sseKMSKey.ChunkOffset + chunkView.OffsetInChunk
var adjustedIV []byte var adjustedIV []byte
if sseKMSKey.ChunkOffset > 0 {
adjustedIV = adjustCTRIV(sseKMSKey.IV, sseKMSKey.ChunkOffset)
if absoluteOffset > 0 {
adjustedIV = adjustCTRIV(sseKMSKey.IV, absoluteOffset)
} else { } else {
adjustedIV = sseKMSKey.IV adjustedIV = sseKMSKey.IV
} }
@ -1343,28 +1334,18 @@ func (s3a *S3ApiServer) decryptSSEKMSChunkView(ctx context.Context, fileChunk *f
EncryptionContext: sseKMSKey.EncryptionContext, EncryptionContext: sseKMSKey.EncryptionContext,
BucketKeyEnabled: sseKMSKey.BucketKeyEnabled, BucketKeyEnabled: sseKMSKey.BucketKeyEnabled,
IV: adjustedIV, IV: adjustedIV,
ChunkOffset: sseKMSKey.ChunkOffset,
ChunkOffset: absoluteOffset,
} }
decryptedReader, decryptErr := CreateSSEKMSDecryptedReader(fullChunkReader, adjustedKey)
decryptedReader, decryptErr := CreateSSEKMSDecryptedReader(encryptedReader, adjustedKey)
if decryptErr != nil { if decryptErr != nil {
fullChunkReader.Close()
encryptedReader.Close()
return nil, fmt.Errorf("failed to create KMS decrypted reader: %w", decryptErr) return nil, fmt.Errorf("failed to create KMS decrypted reader: %w", decryptErr)
} }
// Skip to position and limit to ViewSize
if chunkView.OffsetInChunk > 0 {
_, err = io.CopyN(io.Discard, decryptedReader, chunkView.OffsetInChunk)
if err != nil {
if closer, ok := decryptedReader.(io.Closer); ok {
closer.Close()
}
return nil, fmt.Errorf("failed to skip to offset: %w", err)
}
}
// Return a reader that only reads ViewSize bytes
limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize)) limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize))
return &rc{Reader: limitedReader, Closer: fullChunkReader}, nil
return &rc{Reader: limitedReader, Closer: encryptedReader}, nil
} }
// Non-KMS encrypted chunk // Non-KMS encrypted chunk
@ -1381,38 +1362,36 @@ func (s3a *S3ApiServer) decryptSSES3ChunkView(ctx context.Context, fileChunk *fi
return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %w", err) return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %w", err)
} }
// Fetch FULL encrypted chunk
fullChunkReader, err := s3a.fetchFullChunk(ctx, chunkView.FileId)
// Fetch only the needed encrypted bytes for this view
encryptedReader, err := s3a.fetchChunkViewData(ctx, chunkView)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to fetch full chunk: %w", err)
return nil, fmt.Errorf("failed to fetch chunk view: %w", err)
} }
// Get base IV and use it directly (no offset adjustment for full chunk)
iv, err := GetSSES3IV(entry, sseS3Key, keyManager)
// Get base IV
baseIV, err := GetSSES3IV(entry, sseS3Key, keyManager)
if err != nil { if err != nil {
fullChunkReader.Close()
encryptedReader.Close()
return nil, fmt.Errorf("failed to get SSE-S3 IV: %w", err) return nil, fmt.Errorf("failed to get SSE-S3 IV: %w", err)
} }
decryptedReader, decryptErr := CreateSSES3DecryptedReader(fullChunkReader, sseS3Key, iv)
if decryptErr != nil {
fullChunkReader.Close()
return nil, fmt.Errorf("failed to create S3 decrypted reader: %w", decryptErr)
// Adjust IV for CTR seek if needed
var adjustedIV []byte
if chunkView.OffsetInChunk > 0 {
adjustedIV = adjustCTRIV(baseIV, chunkView.OffsetInChunk)
} else {
adjustedIV = baseIV
} }
// Skip to position and limit to ViewSize
if chunkView.OffsetInChunk > 0 {
_, err = io.CopyN(io.Discard, decryptedReader, chunkView.OffsetInChunk)
if err != nil {
if closer, ok := decryptedReader.(io.Closer); ok {
closer.Close()
}
return nil, fmt.Errorf("failed to skip to offset: %w", err)
}
decryptedReader, decryptErr := CreateSSES3DecryptedReader(encryptedReader, sseS3Key, adjustedIV)
if decryptErr != nil {
encryptedReader.Close()
return nil, fmt.Errorf("failed to create S3 decrypted reader: %w", decryptErr)
} }
// Return a reader that only reads ViewSize bytes
limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize)) limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize))
return &rc{Reader: limitedReader, Closer: fullChunkReader}, nil
return &rc{Reader: limitedReader, Closer: encryptedReader}, nil
} }
// adjustCTRIV adjusts the IV for CTR mode based on byte offset // adjustCTRIV adjusts the IV for CTR mode based on byte offset
@ -2068,7 +2047,7 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
// Detect and handle SSE // Detect and handle SSE
glog.V(3).Infof("HeadObjectHandler: Retrieved entry for %s%s - %d chunks", bucket, object, len(objectEntryForSSE.Chunks)) glog.V(3).Infof("HeadObjectHandler: Retrieved entry for %s%s - %d chunks", bucket, object, len(objectEntryForSSE.Chunks))
sseType := s3a.detectPrimarySSEType(objectEntryForSSE) sseType := s3a.detectPrimarySSEType(objectEntryForSSE)
glog.V(0).Infof("HeadObjectHandler: Detected SSE type: %s", sseType)
glog.V(2).Infof("HeadObjectHandler: Detected SSE type: %s", sseType)
if sseType != "" && sseType != "None" { if sseType != "" && sseType != "None" {
// Validate SSE headers for encrypted objects // Validate SSE headers for encrypted objects
switch sseType { switch sseType {

2
weed/s3api/s3api_object_handlers_list.go

@ -358,7 +358,7 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m
// Sort CommonPrefixes to match AWS S3 behavior // Sort CommonPrefixes to match AWS S3 behavior
// AWS S3 treats the delimiter character as having lower priority than other characters // AWS S3 treats the delimiter character as having lower priority than other characters
// For example with delimiter '/', 'foo/' comes before 'foo+1/' even though '+' (ASCII 43) < '/' (ASCII 47) // For example with delimiter '/', 'foo/' comes before 'foo+1/' even though '+' (ASCII 43) < '/' (ASCII 47)
// Sorting happens on decoded values for correct lexicographic order
// Sorting happens on unencoded (raw) values for correct lexicographic order
sort.Slice(response.CommonPrefixes, func(i, j int) bool { sort.Slice(response.CommonPrefixes, func(i, j int) bool {
return compareWithDelimiter(response.CommonPrefixes[i].Prefix, response.CommonPrefixes[j].Prefix, delimiter) return compareWithDelimiter(response.CommonPrefixes[i].Prefix, response.CommonPrefixes[j].Prefix, delimiter)
}) })

Loading…
Cancel
Save