You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
420 lines
15 KiB
420 lines
15 KiB
package s3api
|
|
|
|
import (
|
|
"fmt"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
|
)
|
|
|
|
// S3MultipartIAMManager handles IAM integration for multipart upload operations
|
|
type S3MultipartIAMManager struct {
|
|
s3iam *S3IAMIntegration
|
|
}
|
|
|
|
// NewS3MultipartIAMManager creates a new multipart IAM manager
|
|
func NewS3MultipartIAMManager(s3iam *S3IAMIntegration) *S3MultipartIAMManager {
|
|
return &S3MultipartIAMManager{
|
|
s3iam: s3iam,
|
|
}
|
|
}
|
|
|
|
// MultipartUploadRequest represents a multipart upload request
|
|
type MultipartUploadRequest struct {
|
|
Bucket string `json:"bucket"` // S3 bucket name
|
|
ObjectKey string `json:"object_key"` // S3 object key
|
|
UploadID string `json:"upload_id"` // Multipart upload ID
|
|
PartNumber int `json:"part_number"` // Part number for upload part
|
|
Operation string `json:"operation"` // Multipart operation type
|
|
SessionToken string `json:"session_token"` // JWT session token
|
|
Headers map[string]string `json:"headers"` // Request headers
|
|
ContentSize int64 `json:"content_size"` // Content size for validation
|
|
}
|
|
|
|
// MultipartUploadPolicy represents security policies for multipart uploads
|
|
type MultipartUploadPolicy struct {
|
|
MaxPartSize int64 `json:"max_part_size"` // Maximum part size (5GB AWS limit)
|
|
MinPartSize int64 `json:"min_part_size"` // Minimum part size (5MB AWS limit, except last part)
|
|
MaxParts int `json:"max_parts"` // Maximum number of parts (10,000 AWS limit)
|
|
MaxUploadDuration time.Duration `json:"max_upload_duration"` // Maximum time to complete multipart upload
|
|
AllowedContentTypes []string `json:"allowed_content_types"` // Allowed content types
|
|
RequiredHeaders []string `json:"required_headers"` // Required headers for validation
|
|
IPWhitelist []string `json:"ip_whitelist"` // Allowed IP addresses/ranges
|
|
}
|
|
|
|
// MultipartOperation represents different multipart upload operations
|
|
type MultipartOperation string
|
|
|
|
const (
|
|
MultipartOpInitiate MultipartOperation = "initiate"
|
|
MultipartOpUploadPart MultipartOperation = "upload_part"
|
|
MultipartOpComplete MultipartOperation = "complete"
|
|
MultipartOpAbort MultipartOperation = "abort"
|
|
MultipartOpList MultipartOperation = "list"
|
|
MultipartOpListParts MultipartOperation = "list_parts"
|
|
)
|
|
|
|
// ValidateMultipartOperationWithIAM validates multipart operations using IAM policies
|
|
func (iam *IdentityAccessManagement) ValidateMultipartOperationWithIAM(r *http.Request, identity *Identity, operation MultipartOperation) s3err.ErrorCode {
|
|
if iam.iamIntegration == nil {
|
|
// Fall back to standard validation
|
|
return s3err.ErrNone
|
|
}
|
|
|
|
// Extract bucket and object from request
|
|
bucket, object := s3_constants.GetBucketAndObject(r)
|
|
|
|
// Determine the S3 action based on multipart operation
|
|
action := determineMultipartS3Action(operation)
|
|
|
|
// Extract session token from request
|
|
sessionToken := extractSessionTokenFromRequest(r)
|
|
if sessionToken == "" {
|
|
// No session token - use standard auth
|
|
return s3err.ErrNone
|
|
}
|
|
|
|
// Retrieve the actual principal ARN from the request header
|
|
// This header is set during initial authentication and contains the correct assumed role ARN
|
|
principalArn := r.Header.Get("X-SeaweedFS-Principal")
|
|
if principalArn == "" {
|
|
glog.V(0).Info("IAM authorization for multipart operation failed: missing principal ARN in request header")
|
|
return s3err.ErrAccessDenied
|
|
}
|
|
|
|
// Create IAM identity for authorization
|
|
iamIdentity := &IAMIdentity{
|
|
Name: identity.Name,
|
|
Principal: principalArn,
|
|
SessionToken: sessionToken,
|
|
Account: identity.Account,
|
|
}
|
|
|
|
// Authorize using IAM
|
|
ctx := r.Context()
|
|
errCode := iam.iamIntegration.AuthorizeAction(ctx, iamIdentity, action, bucket, object, r)
|
|
if errCode != s3err.ErrNone {
|
|
glog.V(3).Infof("IAM authorization failed for multipart operation: principal=%s operation=%s action=%s bucket=%s object=%s",
|
|
iamIdentity.Principal, operation, action, bucket, object)
|
|
return errCode
|
|
}
|
|
|
|
glog.V(3).Infof("IAM authorization succeeded for multipart operation: principal=%s operation=%s action=%s bucket=%s object=%s",
|
|
iamIdentity.Principal, operation, action, bucket, object)
|
|
return s3err.ErrNone
|
|
}
|
|
|
|
// ValidateMultipartRequestWithPolicy validates multipart request against security policy
|
|
func (policy *MultipartUploadPolicy) ValidateMultipartRequestWithPolicy(req *MultipartUploadRequest) error {
|
|
if req == nil {
|
|
return fmt.Errorf("multipart request cannot be nil")
|
|
}
|
|
|
|
// Validate part size for upload part operations
|
|
if req.Operation == string(MultipartOpUploadPart) {
|
|
if req.ContentSize > policy.MaxPartSize {
|
|
return fmt.Errorf("part size %d exceeds maximum allowed %d", req.ContentSize, policy.MaxPartSize)
|
|
}
|
|
|
|
// Minimum part size validation (except for last part)
|
|
// Note: Last part validation would require knowing if this is the final part
|
|
if req.ContentSize < policy.MinPartSize && req.ContentSize > 0 {
|
|
glog.V(2).Infof("Part size %d is below minimum %d - assuming last part", req.ContentSize, policy.MinPartSize)
|
|
}
|
|
|
|
// Validate part number
|
|
if req.PartNumber < 1 || req.PartNumber > policy.MaxParts {
|
|
return fmt.Errorf("part number %d is invalid (must be 1-%d)", req.PartNumber, policy.MaxParts)
|
|
}
|
|
}
|
|
|
|
// Validate required headers first
|
|
if req.Headers != nil {
|
|
for _, requiredHeader := range policy.RequiredHeaders {
|
|
if _, exists := req.Headers[requiredHeader]; !exists {
|
|
// Check lowercase version
|
|
if _, exists := req.Headers[strings.ToLower(requiredHeader)]; !exists {
|
|
return fmt.Errorf("required header %s is missing", requiredHeader)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Validate content type if specified
|
|
if len(policy.AllowedContentTypes) > 0 && req.Headers != nil {
|
|
contentType := req.Headers["Content-Type"]
|
|
if contentType == "" {
|
|
contentType = req.Headers["content-type"]
|
|
}
|
|
|
|
allowed := false
|
|
for _, allowedType := range policy.AllowedContentTypes {
|
|
if contentType == allowedType {
|
|
allowed = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !allowed {
|
|
return fmt.Errorf("content type %s is not allowed", contentType)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Enhanced multipart handlers with IAM integration
|
|
|
|
// NewMultipartUploadWithIAM handles initiate multipart upload with IAM validation
|
|
func (s3a *S3ApiServer) NewMultipartUploadWithIAM(w http.ResponseWriter, r *http.Request) {
|
|
// Validate IAM permissions first
|
|
if s3a.iam.iamIntegration != nil {
|
|
if identity, errCode := s3a.iam.authRequest(r, s3_constants.ACTION_WRITE); errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
} else {
|
|
// Additional multipart-specific IAM validation
|
|
if errCode := s3a.iam.ValidateMultipartOperationWithIAM(r, identity, MultipartOpInitiate); errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Delegate to existing handler
|
|
s3a.NewMultipartUploadHandler(w, r)
|
|
}
|
|
|
|
// CompleteMultipartUploadWithIAM handles complete multipart upload with IAM validation
|
|
func (s3a *S3ApiServer) CompleteMultipartUploadWithIAM(w http.ResponseWriter, r *http.Request) {
|
|
// Validate IAM permissions first
|
|
if s3a.iam.iamIntegration != nil {
|
|
if identity, errCode := s3a.iam.authRequest(r, s3_constants.ACTION_WRITE); errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
} else {
|
|
// Additional multipart-specific IAM validation
|
|
if errCode := s3a.iam.ValidateMultipartOperationWithIAM(r, identity, MultipartOpComplete); errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Delegate to existing handler
|
|
s3a.CompleteMultipartUploadHandler(w, r)
|
|
}
|
|
|
|
// AbortMultipartUploadWithIAM handles abort multipart upload with IAM validation
|
|
func (s3a *S3ApiServer) AbortMultipartUploadWithIAM(w http.ResponseWriter, r *http.Request) {
|
|
// Validate IAM permissions first
|
|
if s3a.iam.iamIntegration != nil {
|
|
if identity, errCode := s3a.iam.authRequest(r, s3_constants.ACTION_WRITE); errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
} else {
|
|
// Additional multipart-specific IAM validation
|
|
if errCode := s3a.iam.ValidateMultipartOperationWithIAM(r, identity, MultipartOpAbort); errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Delegate to existing handler
|
|
s3a.AbortMultipartUploadHandler(w, r)
|
|
}
|
|
|
|
// ListMultipartUploadsWithIAM handles list multipart uploads with IAM validation
|
|
func (s3a *S3ApiServer) ListMultipartUploadsWithIAM(w http.ResponseWriter, r *http.Request) {
|
|
// Validate IAM permissions first
|
|
if s3a.iam.iamIntegration != nil {
|
|
if identity, errCode := s3a.iam.authRequest(r, s3_constants.ACTION_LIST); errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
} else {
|
|
// Additional multipart-specific IAM validation
|
|
if errCode := s3a.iam.ValidateMultipartOperationWithIAM(r, identity, MultipartOpList); errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Delegate to existing handler
|
|
s3a.ListMultipartUploadsHandler(w, r)
|
|
}
|
|
|
|
// UploadPartWithIAM handles upload part with IAM validation
|
|
func (s3a *S3ApiServer) UploadPartWithIAM(w http.ResponseWriter, r *http.Request) {
|
|
// Validate IAM permissions first
|
|
if s3a.iam.iamIntegration != nil {
|
|
if identity, errCode := s3a.iam.authRequest(r, s3_constants.ACTION_WRITE); errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
} else {
|
|
// Additional multipart-specific IAM validation
|
|
if errCode := s3a.iam.ValidateMultipartOperationWithIAM(r, identity, MultipartOpUploadPart); errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
|
|
// Validate part size and other policies
|
|
if err := s3a.validateUploadPartRequest(r); err != nil {
|
|
glog.Errorf("Upload part validation failed: %v", err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Delegate to existing object PUT handler (which handles upload part)
|
|
s3a.PutObjectHandler(w, r)
|
|
}
|
|
|
|
// Helper functions
|
|
|
|
// determineMultipartS3Action maps multipart operations to granular S3 actions
|
|
// This enables fine-grained IAM policies for multipart upload operations
|
|
func determineMultipartS3Action(operation MultipartOperation) Action {
|
|
switch operation {
|
|
case MultipartOpInitiate:
|
|
return s3_constants.ACTION_CREATE_MULTIPART_UPLOAD
|
|
case MultipartOpUploadPart:
|
|
return s3_constants.ACTION_UPLOAD_PART
|
|
case MultipartOpComplete:
|
|
return s3_constants.ACTION_COMPLETE_MULTIPART
|
|
case MultipartOpAbort:
|
|
return s3_constants.ACTION_ABORT_MULTIPART
|
|
case MultipartOpList:
|
|
return s3_constants.ACTION_LIST_MULTIPART_UPLOADS
|
|
case MultipartOpListParts:
|
|
return s3_constants.ACTION_LIST_PARTS
|
|
default:
|
|
// Fail closed for unmapped operations to prevent unintended access
|
|
glog.Errorf("unmapped multipart operation: %s", operation)
|
|
return "s3:InternalErrorUnknownMultipartAction" // Non-existent action ensures denial
|
|
}
|
|
}
|
|
|
|
// extractSessionTokenFromRequest extracts session token from various request sources
|
|
func extractSessionTokenFromRequest(r *http.Request) string {
|
|
// Check Authorization header for Bearer token
|
|
if authHeader := r.Header.Get("Authorization"); authHeader != "" {
|
|
if strings.HasPrefix(authHeader, "Bearer ") {
|
|
return strings.TrimPrefix(authHeader, "Bearer ")
|
|
}
|
|
}
|
|
|
|
// Check X-Amz-Security-Token header
|
|
if token := r.Header.Get("X-Amz-Security-Token"); token != "" {
|
|
return token
|
|
}
|
|
|
|
// Check query parameters for presigned URL tokens
|
|
if token := r.URL.Query().Get("X-Amz-Security-Token"); token != "" {
|
|
return token
|
|
}
|
|
|
|
return ""
|
|
}
|
|
|
|
// validateUploadPartRequest validates upload part request against policies
|
|
func (s3a *S3ApiServer) validateUploadPartRequest(r *http.Request) error {
|
|
// Get default multipart policy
|
|
policy := DefaultMultipartUploadPolicy()
|
|
|
|
// Extract part number from query
|
|
partNumberStr := r.URL.Query().Get("partNumber")
|
|
if partNumberStr == "" {
|
|
return fmt.Errorf("missing partNumber parameter")
|
|
}
|
|
|
|
partNumber, err := strconv.Atoi(partNumberStr)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid partNumber: %v", err)
|
|
}
|
|
|
|
// Get content length
|
|
contentLength := r.ContentLength
|
|
if contentLength < 0 {
|
|
contentLength = 0
|
|
}
|
|
|
|
// Create multipart request for validation
|
|
bucket, object := s3_constants.GetBucketAndObject(r)
|
|
multipartReq := &MultipartUploadRequest{
|
|
Bucket: bucket,
|
|
ObjectKey: object,
|
|
PartNumber: partNumber,
|
|
Operation: string(MultipartOpUploadPart),
|
|
ContentSize: contentLength,
|
|
Headers: make(map[string]string),
|
|
}
|
|
|
|
// Copy relevant headers
|
|
for key, values := range r.Header {
|
|
if len(values) > 0 {
|
|
multipartReq.Headers[key] = values[0]
|
|
}
|
|
}
|
|
|
|
// Validate against policy
|
|
return policy.ValidateMultipartRequestWithPolicy(multipartReq)
|
|
}
|
|
|
|
// DefaultMultipartUploadPolicy returns a default multipart upload security policy
|
|
func DefaultMultipartUploadPolicy() *MultipartUploadPolicy {
|
|
return &MultipartUploadPolicy{
|
|
MaxPartSize: 5 * 1024 * 1024 * 1024, // 5GB AWS limit
|
|
MinPartSize: 5 * 1024 * 1024, // 5MB AWS minimum (except last part)
|
|
MaxParts: 10000, // AWS limit
|
|
MaxUploadDuration: 7 * 24 * time.Hour, // 7 days to complete upload
|
|
AllowedContentTypes: []string{}, // Empty means all types allowed
|
|
RequiredHeaders: []string{}, // No required headers by default
|
|
IPWhitelist: []string{}, // Empty means no IP restrictions
|
|
}
|
|
}
|
|
|
|
// MultipartUploadSession represents an ongoing multipart upload session
|
|
type MultipartUploadSession struct {
|
|
UploadID string `json:"upload_id"`
|
|
Bucket string `json:"bucket"`
|
|
ObjectKey string `json:"object_key"`
|
|
Initiator string `json:"initiator"` // User who initiated the upload
|
|
Owner string `json:"owner"` // Object owner
|
|
CreatedAt time.Time `json:"created_at"` // When upload was initiated
|
|
Parts []MultipartUploadPart `json:"parts"` // Uploaded parts
|
|
Metadata map[string]string `json:"metadata"` // Object metadata
|
|
Policy *MultipartUploadPolicy `json:"policy"` // Applied security policy
|
|
SessionToken string `json:"session_token"` // IAM session token
|
|
}
|
|
|
|
// MultipartUploadPart represents an uploaded part
|
|
type MultipartUploadPart struct {
|
|
PartNumber int `json:"part_number"`
|
|
Size int64 `json:"size"`
|
|
ETag string `json:"etag"`
|
|
LastModified time.Time `json:"last_modified"`
|
|
Checksum string `json:"checksum"` // Optional integrity checksum
|
|
}
|
|
|
|
// GetMultipartUploadSessions retrieves active multipart upload sessions for a bucket
|
|
func (s3a *S3ApiServer) GetMultipartUploadSessions(bucket string) ([]*MultipartUploadSession, error) {
|
|
// This would typically query the filer for active multipart uploads
|
|
// For now, return empty list as this is a placeholder for the full implementation
|
|
return []*MultipartUploadSession{}, nil
|
|
}
|
|
|
|
// CleanupExpiredMultipartUploads removes expired multipart upload sessions
|
|
func (s3a *S3ApiServer) CleanupExpiredMultipartUploads(maxAge time.Duration) error {
|
|
// This would typically scan for and remove expired multipart uploads
|
|
// Implementation would depend on how multipart sessions are stored in the filer
|
|
glog.V(2).Infof("Cleanup expired multipart uploads older than %v", maxAge)
|
|
return nil
|
|
}
|