@ -24,8 +24,8 @@ import (
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/glog"
)
)
@ -45,6 +45,18 @@ var corsHeaders = []string{
// Package-level to avoid per-call allocations in writeZeroBytes
// Package-level to avoid per-call allocations in writeZeroBytes
var zeroBuf = make ( [ ] byte , 32 * 1024 )
var zeroBuf = make ( [ ] byte , 32 * 1024 )
// countingWriter wraps an io.Writer to count bytes written
type countingWriter struct {
w io . Writer
written int64
}
func ( cw * countingWriter ) Write ( p [ ] byte ) ( int , error ) {
n , err := cw . w . Write ( p )
cw . written += int64 ( n )
return n , err
}
// adjustRangeForPart adjusts a client's Range header to absolute offsets within a part.
// adjustRangeForPart adjusts a client's Range header to absolute offsets within a part.
// Parameters:
// Parameters:
// - partStartOffset: the absolute start offset of the part in the object
// - partStartOffset: the absolute start offset of the part in the object
@ -56,6 +68,11 @@ var zeroBuf = make([]byte, 32*1024)
// - adjustedEnd: the adjusted absolute end offset
// - adjustedEnd: the adjusted absolute end offset
// - error: nil on success, error if the range is invalid
// - error: nil on success, error if the range is invalid
func adjustRangeForPart ( partStartOffset , partEndOffset int64 , clientRangeHeader string ) ( adjustedStart , adjustedEnd int64 , err error ) {
func adjustRangeForPart ( partStartOffset , partEndOffset int64 , clientRangeHeader string ) ( adjustedStart , adjustedEnd int64 , err error ) {
// Validate inputs
if partStartOffset > partEndOffset {
return 0 , 0 , fmt . Errorf ( "invalid part boundaries: start %d > end %d" , partStartOffset , partEndOffset )
}
// If no range header, return the full part
// If no range header, return the full part
if clientRangeHeader == "" || ! strings . HasPrefix ( clientRangeHeader , "bytes=" ) {
if clientRangeHeader == "" || ! strings . HasPrefix ( clientRangeHeader , "bytes=" ) {
return partStartOffset , partEndOffset , nil
return partStartOffset , partEndOffset , nil
@ -92,14 +109,15 @@ func adjustRangeForPart(partStartOffset, partEndOffset int64, clientRangeHeader
}
}
// Handle suffix-range (e.g., "bytes=-100" means last 100 bytes)
// Handle suffix-range (e.g., "bytes=-100" means last 100 bytes)
// When parts[0] is empty, the parsed clientEnd value represents the suffix length,
// not the actual end position. We compute the actual start/end from the suffix length.
if parts [ 0 ] == "" {
if parts [ 0 ] == "" {
// suffix-range: clientEnd is actually the suffix length
suffixLength := clientEnd
suffixLength := clientEnd // clientEnd temporarily holds the suffix length
if suffixLength > partSize {
if suffixLength > partSize {
suffixLength = partSize
suffixLength = partSize
}
}
clientStart = partSize - suffixLength
clientStart = partSize - suffixLength
clientEnd = partSize - 1
clientEnd = partSize - 1 // Now clientEnd holds the actual end position
}
}
// Validate range is within part boundaries
// Validate range is within part boundaries
@ -201,7 +219,6 @@ func removeDuplicateSlashes(object string) string {
return result . String ( )
return result . String ( )
}
}
// hasChildren checks if a path has any child objects (is a directory with contents)
// hasChildren checks if a path has any child objects (is a directory with contents)
//
//
// This helper function is used to distinguish implicit directories from regular files or empty directories.
// This helper function is used to distinguish implicit directories from regular files or empty directories.
@ -720,13 +737,6 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
// This eliminates the 19ms filer proxy overhead
// This eliminates the 19ms filer proxy overhead
// SSE decryption is handled inline during streaming
// SSE decryption is handled inline during streaming
// Safety check: entry must be valid before streaming
if objectEntryForSSE == nil {
glog . Errorf ( "GetObjectHandler: objectEntryForSSE is nil for %s/%s (should not happen)" , bucket , object )
s3err . WriteErrorResponse ( w , r , s3err . ErrInternalError )
return
}
// Detect SSE encryption type
// Detect SSE encryption type
primarySSEType := s3a . detectPrimarySSEType ( objectEntryForSSE )
primarySSEType := s3a . detectPrimarySSEType ( objectEntryForSSE )
@ -887,13 +897,19 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
w . Header ( ) . Set ( "Content-Range" , fmt . Sprintf ( "bytes %d-%d/%d" , offset , offset + size - 1 , totalSize ) )
w . Header ( ) . Set ( "Content-Range" , fmt . Sprintf ( "bytes %d-%d/%d" , offset , offset + size - 1 , totalSize ) )
w . Header ( ) . Set ( "Content-Length" , strconv . FormatInt ( size , 10 ) )
w . Header ( ) . Set ( "Content-Length" , strconv . FormatInt ( size , 10 ) )
w . WriteHeader ( http . StatusPartialContent )
w . WriteHeader ( http . StatusPartialContent )
_ , err := w . Write ( entry . Content [ start : end ] )
written , err := w . Write ( entry . Content [ start : end ] )
if written > 0 {
BucketTrafficSent ( int64 ( written ) , r )
}
return err
return err
}
}
// Non-range request for inline content
// Non-range request for inline content
s3a . setResponseHeaders ( w , r , entry , totalSize )
s3a . setResponseHeaders ( w , r , entry , totalSize )
w . WriteHeader ( http . StatusOK )
w . WriteHeader ( http . StatusOK )
_ , err := w . Write ( entry . Content )
written , err := w . Write ( entry . Content )
if written > 0 {
BucketTrafficSent ( int64 ( written ) , r )
}
return err
return err
}
}
@ -977,17 +993,22 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
w . WriteHeader ( http . StatusOK )
w . WriteHeader ( http . StatusOK )
}
}
// Stream directly to response
// Stream directly to response with counting wrapper
tStreamExec := time . Now ( )
tStreamExec := time . Now ( )
glog . V ( 4 ) . Infof ( "streamFromVolumeServers: starting streamFn, offset=%d, size=%d" , offset , size )
glog . V ( 4 ) . Infof ( "streamFromVolumeServers: starting streamFn, offset=%d, size=%d" , offset , size )
err = streamFn ( w )
cw := & countingWriter { w : w }
err = streamFn ( cw )
streamExecTime = time . Since ( tStreamExec )
streamExecTime = time . Since ( tStreamExec )
// Track traffic even on partial writes for accurate egress accounting
if cw . written > 0 {
BucketTrafficSent ( cw . written , r )
}
if err != nil {
if err != nil {
glog . Errorf ( "streamFromVolumeServers: streamFn failed: %v" , err )
glog . Errorf ( "streamFromVolumeServers: streamFn failed after writing %d bytes : %v" , cw . written , err )
// Streaming error after WriteHeader was called - response already partially written
// Streaming error after WriteHeader was called - response already partially written
return newStreamErrorWithResponse ( err )
return newStreamErrorWithResponse ( err )
}
}
glog . V ( 4 ) . Infof ( "streamFromVolumeServers: streamFn completed successfully" )
glog . V ( 4 ) . Infof ( "streamFromVolumeServers: streamFn completed successfully, wrote %d bytes " , cw . written )
return nil
return nil
}
}
@ -1189,9 +1210,13 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
if isRangeRequest {
if isRangeRequest {
glog . V ( 2 ) . Infof ( "Using range-aware SSE decryption for offset=%d size=%d" , offset , size )
glog . V ( 2 ) . Infof ( "Using range-aware SSE decryption for offset=%d size=%d" , offset , size )
streamFetchTime = 0 // No full stream fetch in range-aware path
streamFetchTime = 0 // No full stream fetch in range-aware path
err := s3a . streamDecryptedRangeFromChunks ( r . Context ( ) , w , entry , offset , size , sseType , decryptionKey )
written , err := s3a . streamDecryptedRangeFromChunks ( r . Context ( ) , w , entry , offset , size , sseType , decryptionKey )
decryptSetupTime = time . Since ( tDecryptSetup )
decryptSetupTime = time . Since ( tDecryptSetup )
copyTime = decryptSetupTime // Streaming is included in decrypt setup for range-aware path
copyTime = decryptSetupTime // Streaming is included in decrypt setup for range-aware path
// Track traffic even on partial writes for accurate egress accounting
if written > 0 {
BucketTrafficSent ( written , r )
}
if err != nil {
if err != nil {
// Error after WriteHeader - response already written
// Error after WriteHeader - response already written
return newStreamErrorWithResponse ( err )
return newStreamErrorWithResponse ( err )
@ -1338,6 +1363,10 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
buf := make ( [ ] byte , 128 * 1024 )
buf := make ( [ ] byte , 128 * 1024 )
copied , copyErr := io . CopyBuffer ( w , decryptedReader , buf )
copied , copyErr := io . CopyBuffer ( w , decryptedReader , buf )
copyTime = time . Since ( tCopy )
copyTime = time . Since ( tCopy )
// Track traffic even on partial writes for accurate egress accounting
if copied > 0 {
BucketTrafficSent ( copied , r )
}
if copyErr != nil {
if copyErr != nil {
glog . Errorf ( "Failed to copy full object: copied %d bytes: %v" , copied , copyErr )
glog . Errorf ( "Failed to copy full object: copied %d bytes: %v" , copied , copyErr )
// Error after WriteHeader - response already written
// Error after WriteHeader - response already written
@ -1349,7 +1378,8 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
// streamDecryptedRangeFromChunks streams a range of decrypted data by only fetching needed chunks
// streamDecryptedRangeFromChunks streams a range of decrypted data by only fetching needed chunks
// This implements the filer's ViewFromChunks approach for optimal range performance
// This implements the filer's ViewFromChunks approach for optimal range performance
func ( s3a * S3ApiServer ) streamDecryptedRangeFromChunks ( ctx context . Context , w io . Writer , entry * filer_pb . Entry , offset int64 , size int64 , sseType string , decryptionKey interface { } ) error {
// Returns the number of bytes written and any error
func ( s3a * S3ApiServer ) streamDecryptedRangeFromChunks ( ctx context . Context , w io . Writer , entry * filer_pb . Entry , offset int64 , size int64 , sseType string , decryptionKey interface { } ) ( int64 , error ) {
// Use filer's ViewFromChunks to resolve only needed chunks for the range
// Use filer's ViewFromChunks to resolve only needed chunks for the range
lookupFileIdFn := s3a . createLookupFileIdFunction ( )
lookupFileIdFn := s3a . createLookupFileIdFunction ( )
chunkViews := filer . ViewFromChunks ( ctx , lookupFileIdFn , entry . GetChunks ( ) , offset , size )
chunkViews := filer . ViewFromChunks ( ctx , lookupFileIdFn , entry . GetChunks ( ) , offset , size )
@ -1366,7 +1396,7 @@ func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io
gap := chunkView . ViewOffset - targetOffset
gap := chunkView . ViewOffset - targetOffset
glog . V ( 4 ) . Infof ( "Writing %d zero bytes for gap [%d,%d)" , gap , targetOffset , chunkView . ViewOffset )
glog . V ( 4 ) . Infof ( "Writing %d zero bytes for gap [%d,%d)" , gap , targetOffset , chunkView . ViewOffset )
if err := writeZeroBytes ( w , gap ) ; err != nil {
if err := writeZeroBytes ( w , gap ) ; err != nil {
return fmt . Errorf ( "failed to write zero padding: %w" , err )
return totalWritten , fmt . Errorf ( "failed to write zero padding: %w" , err )
}
}
totalWritten += gap
totalWritten += gap
targetOffset = chunkView . ViewOffset
targetOffset = chunkView . ViewOffset
@ -1381,7 +1411,7 @@ func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io
}
}
}
}
if fileChunk == nil {
if fileChunk == nil {
return fmt . Errorf ( "chunk %s not found in entry" , chunkView . FileId )
return totalWritten , fmt . Errorf ( "chunk %s not found in entry" , chunkView . FileId )
}
}
// Fetch and decrypt this chunk view
// Fetch and decrypt this chunk view
@ -1401,7 +1431,7 @@ func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io
}
}
if err != nil {
if err != nil {
return fmt . Errorf ( "failed to decrypt chunk view %s: %w" , chunkView . FileId , err )
return totalWritten , fmt . Errorf ( "failed to decrypt chunk view %s: %w" , chunkView . FileId , err )
}
}
// Copy the decrypted chunk data
// Copy the decrypted chunk data
@ -1414,12 +1444,12 @@ func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io
}
}
if copyErr != nil {
if copyErr != nil {
glog . Errorf ( "streamDecryptedRangeFromChunks: copy error after writing %d bytes (expected %d): %v" , written , chunkView . ViewSize , copyErr )
glog . Errorf ( "streamDecryptedRangeFromChunks: copy error after writing %d bytes (expected %d): %v" , written , chunkView . ViewSize , copyErr )
return fmt . Errorf ( "failed to copy decrypted chunk data: %w" , copyErr )
return totalWritten , fmt . Errorf ( "failed to copy decrypted chunk data: %w" , copyErr )
}
}
if written != int64 ( chunkView . ViewSize ) {
if written != int64 ( chunkView . ViewSize ) {
glog . Errorf ( "streamDecryptedRangeFromChunks: size mismatch - wrote %d bytes but expected %d" , written , chunkView . ViewSize )
glog . Errorf ( "streamDecryptedRangeFromChunks: size mismatch - wrote %d bytes but expected %d" , written , chunkView . ViewSize )
return fmt . Errorf ( "size mismatch: wrote %d bytes but expected %d for chunk %s" , written , chunkView . ViewSize , chunkView . FileId )
return totalWritten , fmt . Errorf ( "size mismatch: wrote %d bytes but expected %d for chunk %s" , written , chunkView . ViewSize , chunkView . FileId )
}
}
totalWritten += written
totalWritten += written
@ -1432,12 +1462,13 @@ func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io
if remaining > 0 {
if remaining > 0 {
glog . V ( 4 ) . Infof ( "Writing %d trailing zero bytes" , remaining )
glog . V ( 4 ) . Infof ( "Writing %d trailing zero bytes" , remaining )
if err := writeZeroBytes ( w , remaining ) ; err != nil {
if err := writeZeroBytes ( w , remaining ) ; err != nil {
return fmt . Errorf ( "failed to write trailing zeros: %w" , err )
return totalWritten , fmt . Errorf ( "failed to write trailing zeros: %w" , err )
}
}
totalWritten += remaining
}
}
glog . V ( 3 ) . Infof ( "Completed range-aware SSE decryption: wrote %d bytes for range [%d,%d)" , totalWritten , offset , offset + size )
glog . V ( 3 ) . Infof ( "Completed range-aware SSE decryption: wrote %d bytes for range [%d,%d)" , totalWritten , offset , offset + size )
return nil
return totalWritten , nil
}
}
// writeZeroBytes writes n zero bytes to writer using the package-level zero buffer
// writeZeroBytes writes n zero bytes to writer using the package-level zero buffer