Browse Source

range read for sse content

pull/7481/head
chrislu 3 weeks ago
parent
commit
7eae9c3ba0
  1. 135
      weed/s3api/s3api_object_handlers.go

135
weed/s3api/s3api_object_handlers.go

@ -767,6 +767,7 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
// Profiling: Track SSE decryption stages
t0 := time.Now()
var (
rangeParseTime time.Duration
keyValidateTime time.Duration
headerSetTime time.Duration
streamFetchTime time.Duration
@ -775,12 +776,81 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
)
defer func() {
totalTime := time.Since(t0)
glog.V(2).Infof(" └─ streamFromVolumeServersWithSSE (%s): total=%v, keyValidate=%v, headerSet=%v, streamFetch=%v, decryptSetup=%v, copy=%v",
sseType, totalTime, keyValidateTime, headerSetTime, streamFetchTime, decryptSetupTime, copyTime)
glog.V(2).Infof(" └─ streamFromVolumeServersWithSSE (%s): total=%v, rangeParse=%v, keyValidate=%v, headerSet=%v, streamFetch=%v, decryptSetup=%v, copy=%v",
sseType, totalTime, rangeParseTime, keyValidateTime, headerSetTime, streamFetchTime, decryptSetupTime, copyTime)
}()
glog.V(2).Infof("streamFromVolumeServersWithSSE: Handling %s encrypted object with inline decryption", sseType)
// Parse Range header BEFORE key validation
totalSize := int64(filer.FileSize(entry))
tRangeParse := time.Now()
var offset int64 = 0
var size int64 = totalSize
rangeHeader := r.Header.Get("Range")
isRangeRequest := false
if rangeHeader != "" && strings.HasPrefix(rangeHeader, "bytes=") {
rangeSpec := rangeHeader[6:]
parts := strings.Split(rangeSpec, "-")
if len(parts) == 2 {
var startOffset, endOffset int64
if parts[0] == "" && parts[1] != "" {
// Suffix range: bytes=-N (last N bytes)
if suffixLen, err := strconv.ParseInt(parts[1], 10, 64); err == nil {
if suffixLen > totalSize {
suffixLen = totalSize
}
startOffset = totalSize - suffixLen
endOffset = totalSize - 1
} else {
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize))
return fmt.Errorf("invalid suffix range")
}
} else {
// Regular range or open-ended range
startOffset = 0
endOffset = totalSize - 1
if parts[0] != "" {
if parsed, err := strconv.ParseInt(parts[0], 10, 64); err == nil {
startOffset = parsed
}
}
if parts[1] != "" {
if parsed, err := strconv.ParseInt(parts[1], 10, 64); err == nil {
endOffset = parsed
}
}
// Validate range
if startOffset < 0 || startOffset >= totalSize {
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize))
return fmt.Errorf("invalid range start")
}
if endOffset >= totalSize {
endOffset = totalSize - 1
}
if endOffset < startOffset {
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize))
return fmt.Errorf("invalid range: end before start")
}
}
offset = startOffset
size = endOffset - startOffset + 1
isRangeRequest = true
glog.V(2).Infof("streamFromVolumeServersWithSSE: Range request bytes %d-%d/%d (size=%d)", startOffset, endOffset, totalSize, size)
}
}
rangeParseTime = time.Since(tRangeParse)
// Validate SSE keys BEFORE streaming
tKeyValidate := time.Now()
var decryptionKey interface{}
@ -836,9 +906,25 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
// Set response headers
tHeaderSet := time.Now()
totalSize := int64(filer.FileSize(entry))
s3a.setResponseHeaders(w, entry, totalSize)
s3a.addSSEResponseHeadersFromEntry(w, r, entry, sseType)
if isRangeRequest {
// Set range-specific headers
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, offset+size-1, totalSize))
w.Header().Set("Content-Length", strconv.FormatInt(size, 10))
// Set basic headers without Content-Length override
if etag := filer.ETag(entry); etag != "" {
w.Header().Set("ETag", "\""+etag+"\"")
}
if entry.Attributes != nil {
modTime := time.Unix(entry.Attributes.Mtime, 0).UTC()
w.Header().Set("Last-Modified", modTime.Format(http.TimeFormat))
}
w.Header().Set("Accept-Ranges", "bytes")
s3a.addSSEResponseHeadersFromEntry(w, r, entry, sseType)
w.WriteHeader(http.StatusPartialContent)
} else {
s3a.setResponseHeaders(w, entry, totalSize)
s3a.addSSEResponseHeadersFromEntry(w, r, entry, sseType)
}
headerSetTime = time.Since(tHeaderSet)
// Get encrypted data stream (without headers)
@ -857,12 +943,6 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
case s3_constants.SSETypeC:
customerKey := decryptionKey.(*SSECustomerKey)
fmt.Printf("[GET DEBUG] SSE-C decryption - KeyMD5=%s, entry has %d chunks\n", customerKey.KeyMD5, len(entry.GetChunks()))
for i, chunk := range entry.GetChunks() {
fmt.Printf("[GET DEBUG] Chunk[%d]: offset=%d, size=%d, sseType=%v, hasMetadata=%v\n",
i, chunk.Offset, chunk.Size, chunk.SseType, len(chunk.SseMetadata) > 0)
}
// Check if this is a multipart object (multiple chunks with SSE-C metadata)
isMultipartSSEC := false
ssecChunks := 0
@ -872,7 +952,8 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
}
}
isMultipartSSEC = ssecChunks > 1
fmt.Printf("[GET DEBUG] isMultipartSSEC=%v, ssecChunks=%d\n", isMultipartSSEC, ssecChunks)
glog.V(3).Infof("SSE-C decryption: KeyMD5=%s, entry has %d chunks, isMultipart=%v, ssecChunks=%d",
customerKey.KeyMD5, len(entry.GetChunks()), isMultipartSSEC, ssecChunks)
if isMultipartSSEC {
// Handle multipart SSE-C objects - each chunk needs independent decryption with its own IV
@ -899,7 +980,7 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
}
}
isMultipartSSEKMS = ssekmsChunks > 1
fmt.Printf("[GET DEBUG] SSE-KMS: isMultipart=%v, chunks=%d\n", isMultipartSSEKMS, ssekmsChunks)
glog.V(3).Infof("SSE-KMS decryption: isMultipart=%v, ssekmsChunks=%d", isMultipartSSEKMS, ssekmsChunks)
if isMultipartSSEKMS {
// Handle multipart SSE-KMS objects - each chunk needs independent decryption
@ -930,9 +1011,27 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
// Stream decrypted data to client
tCopy := time.Now()
buf := make([]byte, 128*1024)
_, copyErr := io.CopyBuffer(w, decryptedReader, buf)
copyTime = time.Since(tCopy)
return copyErr
if isRangeRequest {
// For range requests, skip to offset and copy only requested size
// Note: This currently decrypts the full object then seeks - future optimization
// would decrypt only the requested range using CTR mode offset calculation
if offset > 0 {
_, err := io.CopyN(io.Discard, decryptedReader, offset)
if err != nil {
glog.Errorf("Failed to seek to range offset %d: %v", offset, err)
return fmt.Errorf("failed to seek to range offset: %w", err)
}
}
_, copyErr := io.CopyN(w, decryptedReader, size)
copyTime = time.Since(tCopy)
return copyErr
} else {
// Full object request
_, copyErr := io.CopyBuffer(w, decryptedReader, buf)
copyTime = time.Since(tCopy)
return copyErr
}
}
// getEncryptedStreamFromVolumes gets raw encrypted data stream from volume servers
@ -2300,7 +2399,7 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(encryptedStream
return nil, fmt.Errorf("failed to decode IV for SSE-C chunk %s: %v", chunk.GetFileIdString(), err)
}
fmt.Printf("[GET DEBUG] Decrypting chunk %s with IV=%x, PartOffset=%d\n",
glog.V(4).Infof("Decrypting SSE-C chunk %s with IV=%x, PartOffset=%d",
chunk.GetFileIdString(), chunkIV[:8], ssecMetadata.PartOffset)
// Note: For multipart SSE-C, each part was encrypted with offset=0
@ -2368,7 +2467,7 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReaderDirect(encryptedStre
return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata for chunk %s: %v", chunk.GetFileIdString(), err)
}
fmt.Printf("[GET DEBUG] Decrypting SSE-KMS chunk %s with KeyID=%s\n",
glog.V(4).Infof("Decrypting SSE-KMS chunk %s with KeyID=%s",
chunk.GetFileIdString(), kmsKey.KeyID)
// Create decrypted reader for this chunk

Loading…
Cancel
Save