|
|
@ -439,29 +439,58 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R |
|
|
rangeSpec := rangeHeader[6:] |
|
|
rangeSpec := rangeHeader[6:] |
|
|
parts := strings.Split(rangeSpec, "-") |
|
|
parts := strings.Split(rangeSpec, "-") |
|
|
if len(parts) == 2 { |
|
|
if len(parts) == 2 { |
|
|
startOffset := int64(0) |
|
|
|
|
|
endOffset := totalSize - 1 |
|
|
|
|
|
|
|
|
var startOffset, endOffset int64 |
|
|
|
|
|
|
|
|
|
|
|
// Handle different Range formats:
|
|
|
|
|
|
// 1. "bytes=0-499" - first 500 bytes (parts[0]="0", parts[1]="499")
|
|
|
|
|
|
// 2. "bytes=500-" - from byte 500 to end (parts[0]="500", parts[1]="")
|
|
|
|
|
|
// 3. "bytes=-500" - last 500 bytes (parts[0]="", parts[1]="500")
|
|
|
|
|
|
|
|
|
|
|
|
if parts[0] == "" && parts[1] != "" { |
|
|
|
|
|
// Suffix range: bytes=-N (last N bytes)
|
|
|
|
|
|
if suffixLen, err := strconv.ParseInt(parts[1], 10, 64); err == nil { |
|
|
|
|
|
if suffixLen > totalSize { |
|
|
|
|
|
suffixLen = totalSize |
|
|
|
|
|
} |
|
|
|
|
|
startOffset = totalSize - suffixLen |
|
|
|
|
|
endOffset = totalSize - 1 |
|
|
|
|
|
} else { |
|
|
|
|
|
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) |
|
|
|
|
|
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) |
|
|
|
|
|
return fmt.Errorf("invalid suffix range") |
|
|
|
|
|
} |
|
|
|
|
|
} else { |
|
|
|
|
|
// Regular range or open-ended range
|
|
|
|
|
|
startOffset = 0 |
|
|
|
|
|
endOffset = totalSize - 1 |
|
|
|
|
|
|
|
|
if parts[0] != "" { |
|
|
|
|
|
if parsed, err := strconv.ParseInt(parts[0], 10, 64); err == nil { |
|
|
|
|
|
startOffset = parsed |
|
|
|
|
|
|
|
|
if parts[0] != "" { |
|
|
|
|
|
if parsed, err := strconv.ParseInt(parts[0], 10, 64); err == nil { |
|
|
|
|
|
startOffset = parsed |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
if parts[1] != "" { |
|
|
|
|
|
if parsed, err := strconv.ParseInt(parts[1], 10, 64); err == nil { |
|
|
|
|
|
endOffset = parsed |
|
|
|
|
|
|
|
|
if parts[1] != "" { |
|
|
|
|
|
if parsed, err := strconv.ParseInt(parts[1], 10, 64); err == nil { |
|
|
|
|
|
endOffset = parsed |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Validate range
|
|
|
|
|
|
if startOffset < 0 || startOffset >= totalSize || endOffset < startOffset { |
|
|
|
|
|
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) |
|
|
|
|
|
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) |
|
|
|
|
|
return fmt.Errorf("invalid range") |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// Validate range
|
|
|
|
|
|
if startOffset < 0 || startOffset >= totalSize { |
|
|
|
|
|
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) |
|
|
|
|
|
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) |
|
|
|
|
|
return fmt.Errorf("invalid range start") |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
if endOffset >= totalSize { |
|
|
|
|
|
endOffset = totalSize - 1 |
|
|
|
|
|
|
|
|
if endOffset >= totalSize { |
|
|
|
|
|
endOffset = totalSize - 1 |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if endOffset < startOffset { |
|
|
|
|
|
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) |
|
|
|
|
|
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) |
|
|
|
|
|
return fmt.Errorf("invalid range: end before start") |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
offset = startOffset |
|
|
offset = startOffset |
|
|
@ -573,9 +602,9 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r |
|
|
if sseType == "" || sseType == "None" { |
|
|
if sseType == "" || sseType == "None" { |
|
|
return s3a.streamFromVolumeServers(w, r, entry, sseType) |
|
|
return s3a.streamFromVolumeServers(w, r, entry, sseType) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
glog.V(2).Infof("streamFromVolumeServersWithSSE: Handling %s encrypted object with inline decryption", sseType) |
|
|
glog.V(2).Infof("streamFromVolumeServersWithSSE: Handling %s encrypted object with inline decryption", sseType) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Validate SSE keys BEFORE streaming
|
|
|
// Validate SSE keys BEFORE streaming
|
|
|
var decryptionKey interface{} |
|
|
var decryptionKey interface{} |
|
|
switch sseType { |
|
|
switch sseType { |
|
|
@ -627,18 +656,18 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r |
|
|
} |
|
|
} |
|
|
decryptionKey = sseS3Key |
|
|
decryptionKey = sseS3Key |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Set response headers
|
|
|
// Set response headers
|
|
|
totalSize := int64(filer.FileSize(entry)) |
|
|
totalSize := int64(filer.FileSize(entry)) |
|
|
s3a.setResponseHeaders(w, entry, totalSize) |
|
|
s3a.setResponseHeaders(w, entry, totalSize) |
|
|
s3a.addSSEResponseHeadersFromEntry(w, r, entry, sseType) |
|
|
s3a.addSSEResponseHeadersFromEntry(w, r, entry, sseType) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Get encrypted data stream (without headers)
|
|
|
// Get encrypted data stream (without headers)
|
|
|
encryptedReader, err := s3a.getEncryptedStreamFromVolumes(r.Context(), entry) |
|
|
encryptedReader, err := s3a.getEncryptedStreamFromVolumes(r.Context(), entry) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
return err |
|
|
return err |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Wrap with decryption
|
|
|
// Wrap with decryption
|
|
|
var decryptedReader io.Reader |
|
|
var decryptedReader io.Reader |
|
|
switch sseType { |
|
|
switch sseType { |
|
|
@ -656,11 +685,11 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r |
|
|
iv, _ := GetSSES3IV(entry, sseS3Key, keyManager) |
|
|
iv, _ := GetSSES3IV(entry, sseS3Key, keyManager) |
|
|
decryptedReader, err = CreateSSES3DecryptedReader(encryptedReader, sseS3Key, iv) |
|
|
decryptedReader, err = CreateSSES3DecryptedReader(encryptedReader, sseS3Key, iv) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if err != nil { |
|
|
if err != nil { |
|
|
return fmt.Errorf("failed to create decrypted reader: %w", err) |
|
|
return fmt.Errorf("failed to create decrypted reader: %w", err) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Stream decrypted data to client
|
|
|
// Stream decrypted data to client
|
|
|
buf := make([]byte, 128*1024) |
|
|
buf := make([]byte, 128*1024) |
|
|
_, copyErr := io.CopyBuffer(w, decryptedReader, buf) |
|
|
_, copyErr := io.CopyBuffer(w, decryptedReader, buf) |
|
|
@ -673,13 +702,13 @@ func (s3a *S3ApiServer) getEncryptedStreamFromVolumes(ctx context.Context, entry |
|
|
if len(entry.Content) > 0 { |
|
|
if len(entry.Content) > 0 { |
|
|
return io.NopCloser(bytes.NewReader(entry.Content)), nil |
|
|
return io.NopCloser(bytes.NewReader(entry.Content)), nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Handle empty files
|
|
|
// Handle empty files
|
|
|
chunks := entry.GetChunks() |
|
|
chunks := entry.GetChunks() |
|
|
if len(chunks) == 0 { |
|
|
if len(chunks) == 0 { |
|
|
return io.NopCloser(bytes.NewReader([]byte{})), nil |
|
|
return io.NopCloser(bytes.NewReader([]byte{})), nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Create lookup function
|
|
|
// Create lookup function
|
|
|
lookupFileIdFn := func(ctx context.Context, fileId string) ([]string, error) { |
|
|
lookupFileIdFn := func(ctx context.Context, fileId string) ([]string, error) { |
|
|
var urls []string |
|
|
var urls []string |
|
|
@ -700,14 +729,14 @@ func (s3a *S3ApiServer) getEncryptedStreamFromVolumes(ctx context.Context, entry |
|
|
}) |
|
|
}) |
|
|
return urls, err |
|
|
return urls, err |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Resolve chunks
|
|
|
// Resolve chunks
|
|
|
totalSize := int64(filer.FileSize(entry)) |
|
|
totalSize := int64(filer.FileSize(entry)) |
|
|
resolvedChunks, _, err := filer.ResolveChunkManifest(ctx, lookupFileIdFn, chunks, 0, totalSize) |
|
|
resolvedChunks, _, err := filer.ResolveChunkManifest(ctx, lookupFileIdFn, chunks, 0, totalSize) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
return nil, err |
|
|
return nil, err |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Create streaming reader
|
|
|
// Create streaming reader
|
|
|
masterClient := &simpleMasterClient{lookupFn: lookupFileIdFn} |
|
|
masterClient := &simpleMasterClient{lookupFn: lookupFileIdFn} |
|
|
streamFn, err := filer.PrepareStreamContentWithThrottler( |
|
|
streamFn, err := filer.PrepareStreamContentWithThrottler( |
|
|
@ -724,14 +753,14 @@ func (s3a *S3ApiServer) getEncryptedStreamFromVolumes(ctx context.Context, entry |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
return nil, err |
|
|
return nil, err |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Create a pipe to get io.ReadCloser
|
|
|
// Create a pipe to get io.ReadCloser
|
|
|
pipeReader, pipeWriter := io.Pipe() |
|
|
pipeReader, pipeWriter := io.Pipe() |
|
|
go func() { |
|
|
go func() { |
|
|
defer pipeWriter.Close() |
|
|
defer pipeWriter.Close() |
|
|
streamFn(pipeWriter) |
|
|
streamFn(pipeWriter) |
|
|
}() |
|
|
}() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return pipeReader, nil |
|
|
return pipeReader, nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@ -1061,7 +1090,7 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request |
|
|
// For HEAD requests, we already have all metadata - just set headers directly
|
|
|
// For HEAD requests, we already have all metadata - just set headers directly
|
|
|
totalSize := int64(filer.FileSize(objectEntryForSSE)) |
|
|
totalSize := int64(filer.FileSize(objectEntryForSSE)) |
|
|
s3a.setResponseHeaders(w, objectEntryForSSE, totalSize) |
|
|
s3a.setResponseHeaders(w, objectEntryForSSE, totalSize) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Detect and handle SSE
|
|
|
// Detect and handle SSE
|
|
|
sseType := s3a.detectPrimarySSEType(objectEntryForSSE) |
|
|
sseType := s3a.detectPrimarySSEType(objectEntryForSSE) |
|
|
if sseType != "" && sseType != "None" { |
|
|
if sseType != "" && sseType != "None" { |
|
|
@ -1089,7 +1118,7 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request |
|
|
// Add SSE response headers
|
|
|
// Add SSE response headers
|
|
|
s3a.addSSEResponseHeadersFromEntry(w, r, objectEntryForSSE, sseType) |
|
|
s3a.addSSEResponseHeadersFromEntry(w, r, objectEntryForSSE, sseType) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
w.WriteHeader(http.StatusOK) |
|
|
w.WriteHeader(http.StatusOK) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|