diff --git a/test/s3/sse/s3_sse_range_server_test.go b/test/s3/sse/s3_sse_range_server_test.go index 50e802daf..0b02ec62b 100644 --- a/test/s3/sse/s3_sse_range_server_test.go +++ b/test/s3/sse/s3_sse_range_server_test.go @@ -443,4 +443,3 @@ func TestSSEMultipartRangeRequestsServerBehavior(t *testing.T) { assert.Equal(t, expectedData, bodyBytes, "Cross-part range content must be correctly decrypted and assembled") } - diff --git a/weed/operation/upload_chunked.go b/weed/operation/upload_chunked.go index 1cf291872..352b329f8 100644 --- a/weed/operation/upload_chunked.go +++ b/weed/operation/upload_chunked.go @@ -26,16 +26,16 @@ type ChunkedUploadResult struct { // ChunkedUploadOption contains options for chunked uploads type ChunkedUploadOption struct { - ChunkSize int32 - SmallFileLimit int64 - Collection string - Replication string - DataCenter string - SaveSmallInline bool - Jwt security.EncodedJwt - MimeType string - AssignFunc func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) - UploadFunc func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) // Optional: for testing + ChunkSize int32 + SmallFileLimit int64 + Collection string + Replication string + DataCenter string + SaveSmallInline bool + Jwt security.EncodedJwt + MimeType string + AssignFunc func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) + UploadFunc func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) // Optional: for testing } var chunkBufferPool = sync.Pool{ @@ -48,25 +48,25 @@ var chunkBufferPool = sync.Pool{ // This prevents OOM by processing the stream in fixed-size chunks // Returns file chunks, MD5 hash, total size, and any small content stored inline func UploadReaderInChunks(ctx context.Context, reader io.Reader, opt *ChunkedUploadOption) (*ChunkedUploadResult, error) { - + md5Hash := md5.New() var partReader = io.TeeReader(reader, md5Hash) - + var fileChunks []*filer_pb.FileChunk var fileChunksLock sync.Mutex var uploadErr error var uploadErrLock sync.Mutex var chunkOffset int64 = 0 - + var wg sync.WaitGroup const bytesBufferCounter = 4 bytesBufferLimitChan := make(chan struct{}, bytesBufferCounter) - + uploadLoop: for { // Throttle buffer usage bytesBufferLimitChan <- struct{}{} - + // Check for errors from parallel uploads uploadErrLock.Lock() if uploadErr != nil { @@ -75,7 +75,7 @@ uploadLoop: break } uploadErrLock.Unlock() - + // Check for context cancellation select { case <-ctx.Done(): @@ -88,12 +88,12 @@ uploadLoop: break uploadLoop default: } - + // Get buffer from pool bytesBuffer := chunkBufferPool.Get().(*bytes.Buffer) limitedReader := io.LimitReader(partReader, int64(opt.ChunkSize)) bytesBuffer.Reset() - + // Read one chunk dataSize, err := bytesBuffer.ReadFrom(limitedReader) if err != nil { @@ -120,26 +120,26 @@ uploadLoop: // which is valid (e.g., touch command creates 0-byte files) break } - - // For small files at offset 0, store inline instead of uploading - if chunkOffset == 0 && opt.SaveSmallInline && dataSize < opt.SmallFileLimit { - smallContent := make([]byte, dataSize) - n, readErr := io.ReadFull(bytesBuffer, smallContent) - chunkBufferPool.Put(bytesBuffer) - <-bytesBufferLimitChan - - if readErr != nil { - return nil, fmt.Errorf("failed to read small content: read %d of %d bytes: %w", n, dataSize, readErr) + + // For small files at offset 0, store inline instead of uploading + if chunkOffset == 0 && opt.SaveSmallInline && dataSize < opt.SmallFileLimit { + smallContent := make([]byte, dataSize) + n, readErr := io.ReadFull(bytesBuffer, smallContent) + chunkBufferPool.Put(bytesBuffer) + <-bytesBufferLimitChan + + if readErr != nil { + return nil, fmt.Errorf("failed to read small content: read %d of %d bytes: %w", n, dataSize, readErr) + } + + return &ChunkedUploadResult{ + FileChunks: nil, + Md5Hash: md5Hash, + TotalSize: dataSize, + SmallContent: smallContent, + }, nil } - - return &ChunkedUploadResult{ - FileChunks: nil, - Md5Hash: md5Hash, - TotalSize: dataSize, - SmallContent: smallContent, - }, nil - } - + // Upload chunk in parallel goroutine wg.Add(1) go func(offset int64, buf *bytes.Buffer) { @@ -148,7 +148,7 @@ uploadLoop: <-bytesBufferLimitChan wg.Done() }() - + // Assign volume for this chunk _, assignResult, assignErr := opt.AssignFunc(ctx, 1) if assignErr != nil { @@ -159,29 +159,29 @@ uploadLoop: uploadErrLock.Unlock() return } - - // Upload chunk data - uploadUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid) - - // Use per-assignment JWT if present, otherwise fall back to the original JWT - // This is critical for secured clusters where each volume assignment has its own JWT - jwt := opt.Jwt - if assignResult.Auth != "" { - jwt = assignResult.Auth - } - - uploadOption := &UploadOption{ - UploadUrl: uploadUrl, - Cipher: false, - IsInputCompressed: false, - MimeType: opt.MimeType, - PairMap: nil, - Jwt: jwt, - } - + + // Upload chunk data + uploadUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid) + + // Use per-assignment JWT if present, otherwise fall back to the original JWT + // This is critical for secured clusters where each volume assignment has its own JWT + jwt := opt.Jwt + if assignResult.Auth != "" { + jwt = assignResult.Auth + } + + uploadOption := &UploadOption{ + UploadUrl: uploadUrl, + Cipher: false, + IsInputCompressed: false, + MimeType: opt.MimeType, + PairMap: nil, + Jwt: jwt, + } + var uploadResult *UploadResult var uploadResultErr error - + // Use mock upload function if provided (for testing), otherwise use real uploader if opt.UploadFunc != nil { uploadResult, uploadResultErr = opt.UploadFunc(ctx, buf.Bytes(), uploadOption) @@ -197,7 +197,7 @@ uploadLoop: } uploadResult, uploadResultErr = uploader.UploadData(ctx, buf.Bytes(), uploadOption) } - + if uploadResultErr != nil { uploadErrLock.Lock() if uploadErr == nil { @@ -206,46 +206,46 @@ uploadLoop: uploadErrLock.Unlock() return } - - // Create chunk entry - // Set ModifiedTsNs to current time (nanoseconds) to track when upload completed - // This is critical for multipart uploads where the same part may be uploaded multiple times - // The part with the latest ModifiedTsNs is selected as the authoritative version - fid, _ := filer_pb.ToFileIdObject(assignResult.Fid) - chunk := &filer_pb.FileChunk{ - FileId: assignResult.Fid, - Offset: offset, - Size: uint64(uploadResult.Size), - ModifiedTsNs: time.Now().UnixNano(), - ETag: uploadResult.ContentMd5, - Fid: fid, - CipherKey: uploadResult.CipherKey, - } - + + // Create chunk entry + // Set ModifiedTsNs to current time (nanoseconds) to track when upload completed + // This is critical for multipart uploads where the same part may be uploaded multiple times + // The part with the latest ModifiedTsNs is selected as the authoritative version + fid, _ := filer_pb.ToFileIdObject(assignResult.Fid) + chunk := &filer_pb.FileChunk{ + FileId: assignResult.Fid, + Offset: offset, + Size: uint64(uploadResult.Size), + ModifiedTsNs: time.Now().UnixNano(), + ETag: uploadResult.ContentMd5, + Fid: fid, + CipherKey: uploadResult.CipherKey, + } + fileChunksLock.Lock() fileChunks = append(fileChunks, chunk) glog.V(4).Infof("uploaded chunk %d to %s [%d,%d)", len(fileChunks), chunk.FileId, offset, offset+int64(chunk.Size)) fileChunksLock.Unlock() - + }(chunkOffset, bytesBuffer) - + // Update offset for next chunk chunkOffset += dataSize - + // If this was a partial chunk, we're done if dataSize < int64(opt.ChunkSize) { break } } - + // Wait for all uploads to complete wg.Wait() - + // Sort chunks by offset (do this even if there's an error, for cleanup purposes) sort.Slice(fileChunks, func(i, j int) bool { return fileChunks[i].Offset < fileChunks[j].Offset }) - + // Check for errors - return partial results for cleanup if uploadErr != nil { glog.Errorf("chunked upload failed: %v (returning %d partial chunks for cleanup)", uploadErr, len(fileChunks)) @@ -257,7 +257,7 @@ uploadLoop: SmallContent: nil, }, uploadErr } - + return &ChunkedUploadResult{ FileChunks: fileChunks, Md5Hash: md5Hash, @@ -265,4 +265,3 @@ uploadLoop: SmallContent: nil, }, nil } - diff --git a/weed/operation/upload_chunked_test.go b/weed/operation/upload_chunked_test.go index 3eeb96857..ec7ffbba2 100644 --- a/weed/operation/upload_chunked_test.go +++ b/weed/operation/upload_chunked_test.go @@ -16,13 +16,13 @@ func TestUploadReaderInChunksReturnsPartialResultsOnError(t *testing.T) { // Create test data larger than one chunk to force multiple chunk uploads testData := bytes.Repeat([]byte("test data for chunk upload failure testing"), 1000) // ~40KB reader := bytes.NewReader(testData) - + uploadAttempts := 0 - + // Create a mock assign function that succeeds for first chunk, then fails assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) { uploadAttempts++ - + if uploadAttempts == 1 { // First chunk succeeds return nil, &AssignResult{ @@ -32,11 +32,11 @@ func TestUploadReaderInChunksReturnsPartialResultsOnError(t *testing.T) { Count: 1, }, nil } - + // Second chunk fails (simulating volume server down or network error) return nil, nil, errors.New("simulated volume assignment failure") } - + // Mock upload function that simulates successful upload uploadFunc := func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) { return &UploadResult{ @@ -46,7 +46,7 @@ func TestUploadReaderInChunksReturnsPartialResultsOnError(t *testing.T) { Error: "", }, nil } - + // Attempt upload with small chunk size to trigger multiple uploads result, err := UploadReaderInChunks(context.Background(), reader, &ChunkedUploadOption{ ChunkSize: 8 * 1024, // 8KB chunks @@ -57,32 +57,32 @@ func TestUploadReaderInChunksReturnsPartialResultsOnError(t *testing.T) { AssignFunc: assignFunc, UploadFunc: uploadFunc, }) - + // VERIFICATION 1: Error should be returned if err == nil { t.Fatal("Expected error from UploadReaderInChunks, got nil") } t.Logf("✓ Got expected error: %v", err) - + // VERIFICATION 2: Result should NOT be nil (this is the fix) if result == nil { t.Fatal("CRITICAL: UploadReaderInChunks returned nil result on error - caller cannot cleanup orphaned chunks!") } t.Log("✓ Result is not nil (partial results returned)") - + // VERIFICATION 3: Result should contain partial chunks from successful uploads // Note: In reality, the first chunk upload would succeed before assignment fails for chunk 2 // But in this test, assignment fails immediately for chunk 2, so we may have 0 chunks // The important thing is that the result struct is returned, not that it has chunks t.Logf("✓ Result contains %d chunks (may be 0 if all assignments failed)", len(result.FileChunks)) - + // VERIFICATION 4: MD5 hash should be available even on partial failure if result.Md5Hash == nil { t.Error("Expected Md5Hash to be non-nil") } else { t.Log("✓ Md5Hash is available for partial data") } - + // VERIFICATION 5: TotalSize should reflect bytes read before failure if result.TotalSize < 0 { t.Errorf("Expected non-negative TotalSize, got %d", result.TotalSize) @@ -95,7 +95,7 @@ func TestUploadReaderInChunksReturnsPartialResultsOnError(t *testing.T) { func TestUploadReaderInChunksSuccessPath(t *testing.T) { testData := []byte("small test data") reader := bytes.NewReader(testData) - + // Mock assign function that always succeeds assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) { return nil, &AssignResult{ @@ -105,7 +105,7 @@ func TestUploadReaderInChunksSuccessPath(t *testing.T) { Count: 1, }, nil } - + // Mock upload function that simulates successful upload uploadFunc := func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) { return &UploadResult{ @@ -115,7 +115,7 @@ func TestUploadReaderInChunksSuccessPath(t *testing.T) { Error: "", }, nil } - + result, err := UploadReaderInChunks(context.Background(), reader, &ChunkedUploadOption{ ChunkSize: 8 * 1024, SmallFileLimit: 256, @@ -125,40 +125,40 @@ func TestUploadReaderInChunksSuccessPath(t *testing.T) { AssignFunc: assignFunc, UploadFunc: uploadFunc, }) - + // VERIFICATION 1: No error should occur if err != nil { t.Fatalf("Expected successful upload, got error: %v", err) } t.Log("✓ Upload completed without error") - + // VERIFICATION 2: Result should not be nil if result == nil { t.Fatal("Expected non-nil result") } t.Log("✓ Result is not nil") - + // VERIFICATION 3: Should have file chunks if len(result.FileChunks) == 0 { t.Error("Expected at least one file chunk") } else { t.Logf("✓ Result contains %d file chunk(s)", len(result.FileChunks)) } - + // VERIFICATION 4: Total size should match input data if result.TotalSize != int64(len(testData)) { t.Errorf("Expected TotalSize=%d, got %d", len(testData), result.TotalSize) } else { t.Logf("✓ TotalSize=%d matches input data", result.TotalSize) } - + // VERIFICATION 5: MD5 hash should be available if result.Md5Hash == nil { t.Error("Expected non-nil Md5Hash") } else { t.Log("✓ Md5Hash is available") } - + // VERIFICATION 6: Chunk should have expected properties if len(result.FileChunks) > 0 { chunk := result.FileChunks[0] @@ -180,13 +180,13 @@ func TestUploadReaderInChunksSuccessPath(t *testing.T) { func TestUploadReaderInChunksContextCancellation(t *testing.T) { testData := bytes.Repeat([]byte("test data"), 10000) // ~80KB reader := bytes.NewReader(testData) - + // Create a context that we'll cancel ctx, cancel := context.WithCancel(context.Background()) - + // Cancel immediately to trigger cancellation handling cancel() - + assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) { return nil, &AssignResult{ Fid: "test-fid,1234", @@ -195,7 +195,7 @@ func TestUploadReaderInChunksContextCancellation(t *testing.T) { Count: 1, }, nil } - + // Mock upload function that simulates successful upload uploadFunc := func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) { return &UploadResult{ @@ -205,7 +205,7 @@ func TestUploadReaderInChunksContextCancellation(t *testing.T) { Error: "", }, nil } - + result, err := UploadReaderInChunks(ctx, reader, &ChunkedUploadOption{ ChunkSize: 8 * 1024, SmallFileLimit: 256, @@ -215,12 +215,12 @@ func TestUploadReaderInChunksContextCancellation(t *testing.T) { AssignFunc: assignFunc, UploadFunc: uploadFunc, }) - + // Should get context cancelled error if err == nil { t.Error("Expected context cancellation error") } - + // Should still get partial results for cleanup if result == nil { t.Error("Expected non-nil result even on context cancellation") @@ -240,7 +240,7 @@ func (m *mockFailingReader) Read(p []byte) (n int, err error) { if m.pos >= m.failAfter { return 0, errors.New("simulated read failure") } - + remaining := m.failAfter - m.pos toRead := len(p) if toRead > remaining { @@ -249,11 +249,11 @@ func (m *mockFailingReader) Read(p []byte) (n int, err error) { if toRead > len(m.data)-m.pos { toRead = len(m.data) - m.pos } - + if toRead == 0 { return 0, io.EOF } - + copy(p, m.data[m.pos:m.pos+toRead]) m.pos += toRead return toRead, nil @@ -267,7 +267,7 @@ func TestUploadReaderInChunksReaderFailure(t *testing.T) { pos: 0, failAfter: 10000, // Fail after 10KB } - + assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) { return nil, &AssignResult{ Fid: "test-fid,1234", @@ -276,7 +276,7 @@ func TestUploadReaderInChunksReaderFailure(t *testing.T) { Count: 1, }, nil } - + // Mock upload function that simulates successful upload uploadFunc := func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) { return &UploadResult{ @@ -286,7 +286,7 @@ func TestUploadReaderInChunksReaderFailure(t *testing.T) { Error: "", }, nil } - + result, err := UploadReaderInChunks(context.Background(), failingReader, &ChunkedUploadOption{ ChunkSize: 8 * 1024, // 8KB chunks SmallFileLimit: 256, @@ -296,18 +296,17 @@ func TestUploadReaderInChunksReaderFailure(t *testing.T) { AssignFunc: assignFunc, UploadFunc: uploadFunc, }) - + // Should get read error if err == nil { t.Error("Expected read failure error") } - + // Should still get partial results if result == nil { t.Fatal("Expected non-nil result on read failure") } - + t.Logf("✓ Got partial result on read failure: chunks=%d, totalSize=%d", len(result.FileChunks), result.TotalSize) } - diff --git a/weed/pb/filer_pb/filer_pb_helper.go b/weed/pb/filer_pb/filer_pb_helper.go index c8dd19d59..c776f83d7 100644 --- a/weed/pb/filer_pb/filer_pb_helper.go +++ b/weed/pb/filer_pb/filer_pb_helper.go @@ -39,7 +39,7 @@ func (entry *Entry) GetExpiryTime() (expiryTime int64) { return expiryTime } } - + // Regular TTL expiration: base on creation time only expiryTime = entry.Attributes.Crtime + int64(entry.Attributes.TtlSec) return expiryTime diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index 953eb8a9a..289fbd556 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -53,7 +53,7 @@ type IdentityAccessManagement struct { // IAM Integration for advanced features iamIntegration *S3IAMIntegration - + // Bucket policy engine for evaluating bucket policies policyEngine *BucketPolicyEngine } @@ -177,41 +177,41 @@ func NewIdentityAccessManagementWithStore(option *S3ApiServerOption, explicitSto accessKeyId := os.Getenv("AWS_ACCESS_KEY_ID") secretAccessKey := os.Getenv("AWS_SECRET_ACCESS_KEY") - if accessKeyId != "" && secretAccessKey != "" { - glog.V(1).Infof("No S3 configuration found, using AWS environment variables as fallback") + if accessKeyId != "" && secretAccessKey != "" { + glog.V(1).Infof("No S3 configuration found, using AWS environment variables as fallback") - // Create environment variable identity name - identityNameSuffix := accessKeyId - if len(accessKeyId) > 8 { - identityNameSuffix = accessKeyId[:8] - } + // Create environment variable identity name + identityNameSuffix := accessKeyId + if len(accessKeyId) > 8 { + identityNameSuffix = accessKeyId[:8] + } - // Create admin identity with environment variable credentials - envIdentity := &Identity{ - Name: "admin-" + identityNameSuffix, - Account: &AccountAdmin, - Credentials: []*Credential{ - { - AccessKey: accessKeyId, - SecretKey: secretAccessKey, + // Create admin identity with environment variable credentials + envIdentity := &Identity{ + Name: "admin-" + identityNameSuffix, + Account: &AccountAdmin, + Credentials: []*Credential{ + { + AccessKey: accessKeyId, + SecretKey: secretAccessKey, + }, }, - }, - Actions: []Action{ - s3_constants.ACTION_ADMIN, - }, - } + Actions: []Action{ + s3_constants.ACTION_ADMIN, + }, + } - // Set as the only configuration - iam.m.Lock() - if len(iam.identities) == 0 { - iam.identities = []*Identity{envIdentity} - iam.accessKeyIdent = map[string]*Identity{accessKeyId: envIdentity} - iam.isAuthEnabled = true - } - iam.m.Unlock() + // Set as the only configuration + iam.m.Lock() + if len(iam.identities) == 0 { + iam.identities = []*Identity{envIdentity} + iam.accessKeyIdent = map[string]*Identity{accessKeyId: envIdentity} + iam.isAuthEnabled = true + } + iam.m.Unlock() - glog.V(1).Infof("Added admin identity from AWS environment variables: %s", envIdentity.Name) - } + glog.V(1).Infof("Added admin identity from AWS environment variables: %s", envIdentity.Name) + } } return iam @@ -460,13 +460,13 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action) case authTypeJWT: glog.V(3).Infof("jwt auth type detected, iamIntegration != nil? %t", iam.iamIntegration != nil) r.Header.Set(s3_constants.AmzAuthType, "Jwt") - if iam.iamIntegration != nil { - identity, s3Err = iam.authenticateJWTWithIAM(r) - authType = "Jwt" - } else { - glog.V(2).Infof("IAM integration is nil, returning ErrNotImplemented") - return identity, s3err.ErrNotImplemented - } + if iam.iamIntegration != nil { + identity, s3Err = iam.authenticateJWTWithIAM(r) + authType = "Jwt" + } else { + glog.V(2).Infof("IAM integration is nil, returning ErrNotImplemented") + return identity, s3err.ErrNotImplemented + } case authTypeAnonymous: authType = "Anonymous" if identity, found = iam.lookupAnonymous(); !found { @@ -501,7 +501,7 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action) // For ListBuckets, authorization is performed in the handler by iterating // through buckets and checking permissions for each. Skip the global check here. policyAllows := false - + if action == s3_constants.ACTION_LIST && bucket == "" { // ListBuckets operation - authorization handled per-bucket in the handler } else { @@ -515,7 +515,7 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action) principal := buildPrincipalARN(identity) // Use context-aware policy evaluation to get the correct S3 action allowed, evaluated, err := iam.policyEngine.EvaluatePolicyWithContext(bucket, object, string(action), principal, r) - + if err != nil { // SECURITY: Fail-close on policy evaluation errors // If we can't evaluate the policy, deny access rather than falling through to IAM @@ -537,7 +537,7 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action) } // If not evaluated (no policy or no matching statements), fall through to IAM/identity checks } - + // Only check IAM if bucket policy didn't explicitly allow // This ensures bucket policies can independently grant access (AWS semantics) if !policyAllows { @@ -617,26 +617,26 @@ func buildPrincipalARN(identity *Identity) string { if identity == nil { return "*" // Anonymous } - + // Check if this is the anonymous user identity (authenticated as anonymous) // S3 policies expect Principal: "*" for anonymous access - if identity.Name == s3_constants.AccountAnonymousId || - (identity.Account != nil && identity.Account.Id == s3_constants.AccountAnonymousId) { + if identity.Name == s3_constants.AccountAnonymousId || + (identity.Account != nil && identity.Account.Id == s3_constants.AccountAnonymousId) { return "*" // Anonymous user } - + // Build an AWS-compatible principal ARN // Format: arn:aws:iam::account-id:user/user-name accountId := identity.Account.Id if accountId == "" { accountId = "000000000000" // Default account ID } - + userName := identity.Name if userName == "" { userName = "unknown" } - + return fmt.Sprintf("arn:aws:iam::%s:user/%s", accountId, userName) } diff --git a/weed/s3api/auth_credentials_subscribe.go b/weed/s3api/auth_credentials_subscribe.go index e03b1c05b..ffb99fe2c 100644 --- a/weed/s3api/auth_credentials_subscribe.go +++ b/weed/s3api/auth_credentials_subscribe.go @@ -145,7 +145,7 @@ func (s3a *S3ApiServer) updateBucketConfigCacheFromEntry(entry *filer_pb.Entry) } else { glog.V(3).Infof("updateBucketConfigCacheFromEntry: no Object Lock configuration found for bucket %s", bucket) } - + // Load bucket policy if present (for performance optimization) config.BucketPolicy = loadBucketPolicyFromExtended(entry, bucket) } diff --git a/weed/s3api/custom_types.go b/weed/s3api/custom_types.go index ea769ac4f..3d7a06ffa 100644 --- a/weed/s3api/custom_types.go +++ b/weed/s3api/custom_types.go @@ -10,6 +10,6 @@ const s3TimeFormat = "2006-01-02T15:04:05.999Z07:00" // ConditionalHeaderResult holds the result of conditional header checking type ConditionalHeaderResult struct { ErrorCode s3err.ErrorCode - ETag string // ETag of the object (for 304 responses) - Entry *filer_pb.Entry // Entry fetched during conditional check (nil if not fetched or object doesn't exist) + ETag string // ETag of the object (for 304 responses) + Entry *filer_pb.Entry // Entry fetched during conditional check (nil if not fetched or object doesn't exist) } diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go index 168cdc3ef..10afab106 100644 --- a/weed/s3api/filer_util.go +++ b/weed/s3api/filer_util.go @@ -137,9 +137,9 @@ func (s3a *S3ApiServer) updateEntriesTTL(parentDirectoryPath string, ttlSec int3 } // processDirectoryTTL processes a single directory in paginated batches -func (s3a *S3ApiServer) processDirectoryTTL(ctx context.Context, client filer_pb.SeaweedFilerClient, +func (s3a *S3ApiServer) processDirectoryTTL(ctx context.Context, client filer_pb.SeaweedFilerClient, dir string, ttlSec int32, dirsToProcess *[]string, updateErrors *[]error) error { - + const batchSize = filer.PaginationSize startFrom := "" diff --git a/weed/s3api/policy_conversion.go b/weed/s3api/policy_conversion.go index 27a8d7560..e22827e3a 100644 --- a/weed/s3api/policy_conversion.go +++ b/weed/s3api/policy_conversion.go @@ -140,13 +140,13 @@ func convertPrincipal(principal interface{}) (*policy_engine.StringOrStringSlice // Handle AWS-style principal with service/user keys // Example: {"AWS": "arn:aws:iam::123456789012:user/Alice"} // Only AWS principals are supported for now. Other types like Service or Federated need special handling. - + awsPrincipals, ok := p["AWS"] if !ok || len(p) != 1 { glog.Warningf("unsupported principal map, only a single 'AWS' key is supported: %v", p) return nil, fmt.Errorf("unsupported principal map, only a single 'AWS' key is supported, got keys: %v", getMapKeys(p)) } - + // Recursively convert the AWS principal value res, err := convertPrincipal(awsPrincipals) if err != nil { @@ -236,4 +236,3 @@ func getMapKeys(m map[string]interface{}) []string { } return keys } - diff --git a/weed/s3api/policy_conversion_test.go b/weed/s3api/policy_conversion_test.go index e7a77126f..ef98c9fbc 100644 --- a/weed/s3api/policy_conversion_test.go +++ b/weed/s3api/policy_conversion_test.go @@ -13,10 +13,10 @@ func TestConvertPolicyDocumentWithMixedTypes(t *testing.T) { Version: "2012-10-17", Statement: []policy.Statement{ { - Sid: "TestMixedTypes", - Effect: "Allow", - Action: []string{"s3:GetObject"}, - Resource: []string{"arn:aws:s3:::bucket/*"}, + Sid: "TestMixedTypes", + Effect: "Allow", + Action: []string{"s3:GetObject"}, + Resource: []string{"arn:aws:s3:::bucket/*"}, Principal: []interface{}{"user1", 123, true}, // Mixed types Condition: map[string]map[string]interface{}{ "NumericEquals": { @@ -90,7 +90,7 @@ func TestConvertPolicyDocumentWithMixedTypes(t *testing.T) { } } - // Check StringEquals condition + // Check StringEquals condition stringCond, ok := stmt.Condition["StringEquals"] if !ok { t.Fatal("Expected StringEquals condition") @@ -116,7 +116,7 @@ func TestConvertPrincipalWithMapAndMixedTypes(t *testing.T) { principalMap := map[string]interface{}{ "AWS": []interface{}{ "arn:aws:iam::123456789012:user/Alice", - 456, // User ID as number + 456, // User ID as number true, // Some boolean value }, } @@ -125,7 +125,7 @@ func TestConvertPrincipalWithMapAndMixedTypes(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - + if result == nil { t.Fatal("Expected non-nil result") } @@ -230,7 +230,7 @@ func TestConvertPrincipalWithNilValues(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - + if result == nil { t.Fatal("Expected non-nil result") } @@ -296,7 +296,7 @@ func TestConvertPrincipalMapWithNilValues(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - + if result == nil { t.Fatal("Expected non-nil result") } @@ -322,11 +322,11 @@ func TestConvertPrincipalMapWithNilValues(t *testing.T) { func TestConvertToStringUnsupportedType(t *testing.T) { // Test that unsupported types (e.g., nested maps/slices) return empty string // This should trigger a warning log and return an error - + type customStruct struct { Field string } - + testCases := []struct { name string input interface{} @@ -494,7 +494,7 @@ func TestConvertPrincipalEmptyStrings(t *testing.T) { func TestConvertStatementWithUnsupportedFields(t *testing.T) { // Test that errors are returned for unsupported fields // These fields are critical for policy semantics and ignoring them would be a security risk - + testCases := []struct { name string statement *policy.Statement @@ -544,7 +544,7 @@ func TestConvertStatementWithUnsupportedFields(t *testing.T) { } else if !strings.Contains(err.Error(), tc.wantError) { t.Errorf("Expected error containing %q, got: %v", tc.wantError, err) } - + // Verify zero-value struct is returned on error if result.Sid != "" || result.Effect != "" { t.Error("Expected zero-value struct on error") @@ -611,4 +611,3 @@ func TestConvertPolicyDocumentWithId(t *testing.T) { t.Errorf("Expected 1 statement, got %d", len(dest.Statement)) } } - diff --git a/weed/s3api/s3_constants/header.go b/weed/s3api/s3_constants/header.go index 757fcf77f..e4c0ad77b 100644 --- a/weed/s3api/s3_constants/header.go +++ b/weed/s3api/s3_constants/header.go @@ -39,13 +39,13 @@ const ( AmzObjectTaggingDirective = "X-Amz-Tagging-Directive" AmzTagCount = "x-amz-tagging-count" - SeaweedFSIsDirectoryKey = "X-Seaweedfs-Is-Directory-Key" - SeaweedFSPartNumber = "X-Seaweedfs-Part-Number" - SeaweedFSUploadId = "X-Seaweedfs-Upload-Id" - SeaweedFSMultipartPartsCount = "X-Seaweedfs-Multipart-Parts-Count" + SeaweedFSIsDirectoryKey = "X-Seaweedfs-Is-Directory-Key" + SeaweedFSPartNumber = "X-Seaweedfs-Part-Number" + SeaweedFSUploadId = "X-Seaweedfs-Upload-Id" + SeaweedFSMultipartPartsCount = "X-Seaweedfs-Multipart-Parts-Count" SeaweedFSMultipartPartBoundaries = "X-Seaweedfs-Multipart-Part-Boundaries" // JSON: [{part:1,start:0,end:2,etag:"abc"},{part:2,start:2,end:3,etag:"def"}] - SeaweedFSExpiresS3 = "X-Seaweedfs-Expires-S3" - AmzMpPartsCount = "x-amz-mp-parts-count" + SeaweedFSExpiresS3 = "X-Seaweedfs-Expires-S3" + AmzMpPartsCount = "x-amz-mp-parts-count" // S3 ACL headers AmzCannedAcl = "X-Amz-Acl" diff --git a/weed/s3api/s3_sse_s3_multipart_test.go b/weed/s3api/s3_sse_s3_multipart_test.go index dda930ebd..88f20d0e9 100644 --- a/weed/s3api/s3_sse_s3_multipart_test.go +++ b/weed/s3api/s3_sse_s3_multipart_test.go @@ -20,7 +20,7 @@ func TestSSES3MultipartChunkViewDecryption(t *testing.T) { // Create test plaintext plaintext := []byte("This is test data for SSE-S3 multipart encryption testing") - + // Simulate multipart upload with 2 parts at different offsets testCases := []struct { name string @@ -217,7 +217,7 @@ func TestSSES3ChunkMetadataDetection(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { hasPerChunkMetadata := tc.chunk.GetSseType() == filer_pb.SSEType_SSE_S3 && len(tc.chunk.GetSseMetadata()) > 0 - + if hasPerChunkMetadata != tc.expectedMultipart { t.Errorf("Expected multipart=%v, got hasPerChunkMetadata=%v", tc.expectedMultipart, hasPerChunkMetadata) } @@ -264,4 +264,3 @@ func TestSSES3EncryptionConsistency(t *testing.T) { t.Error("Second decryption should also work with fresh stream") } } - diff --git a/weed/s3api/s3api_bucket_policy_arn_test.go b/weed/s3api/s3api_bucket_policy_arn_test.go index ef8946918..7e25afba6 100644 --- a/weed/s3api/s3api_bucket_policy_arn_test.go +++ b/weed/s3api/s3api_bucket_policy_arn_test.go @@ -2,7 +2,7 @@ package s3api import ( "testing" - + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" ) @@ -123,4 +123,3 @@ func TestBuildPrincipalARN(t *testing.T) { }) } } - diff --git a/weed/s3api/s3api_bucket_policy_engine.go b/weed/s3api/s3api_bucket_policy_engine.go index 278e3e1ae..fc674e12f 100644 --- a/weed/s3api/s3api_bucket_policy_engine.go +++ b/weed/s3api/s3api_bucket_policy_engine.go @@ -64,7 +64,7 @@ func (bpe *BucketPolicyEngine) LoadBucketPolicyFromCache(bucket string, policyDo glog.Errorf("Failed to convert bucket policy for %s: %v", bucket, err) return fmt.Errorf("failed to convert bucket policy: %w", err) } - + // Marshal the converted policy to JSON for storage in the engine policyJSON, err := json.Marshal(enginePolicyDoc) if err != nil { @@ -152,7 +152,7 @@ func (bpe *BucketPolicyEngine) EvaluatePolicyWithContext(bucket, object, action, // Build resource ARN resource := buildResourceARN(bucket, object) - glog.V(4).Infof("EvaluatePolicyWithContext: bucket=%s, resource=%s, action=%s (from %s), principal=%s", + glog.V(4).Infof("EvaluatePolicyWithContext: bucket=%s, resource=%s, action=%s (from %s), principal=%s", bucket, resource, s3Action, action, principal) // Evaluate using the policy engine diff --git a/weed/s3api/s3api_implicit_directory_test.go b/weed/s3api/s3api_implicit_directory_test.go index e4e9f5821..e7c3633fc 100644 --- a/weed/s3api/s3api_implicit_directory_test.go +++ b/weed/s3api/s3api_implicit_directory_test.go @@ -122,10 +122,10 @@ func TestImplicitDirectoryBehaviorLogic(t *testing.T) { // } // } // } - + isZeroByteFile := tt.fileSize == 0 && !tt.isDirectory isActualDirectory := tt.isDirectory - + shouldReturn404 := false if !tt.versioningEnabled && !tt.hasTrailingSlash { if isZeroByteFile || isActualDirectory { @@ -134,7 +134,7 @@ func TestImplicitDirectoryBehaviorLogic(t *testing.T) { } } } - + if shouldReturn404 != tt.shouldReturn404 { t.Errorf("Logic mismatch for %s:\n Expected shouldReturn404=%v\n Got shouldReturn404=%v\n Description: %s", tt.name, tt.shouldReturn404, shouldReturn404, tt.description) @@ -180,9 +180,9 @@ func TestHasChildrenLogic(t *testing.T) { description: "Should return false when no children exist (EOF)", }, { - name: "Directory with leading slash in prefix", - bucket: "test-bucket", - prefix: "/dataset", + name: "Directory with leading slash in prefix", + bucket: "test-bucket", + prefix: "/dataset", listResponse: &filer_pb.ListEntriesResponse{ Entry: &filer_pb.Entry{ Name: "file.parquet", @@ -202,14 +202,14 @@ func TestHasChildrenLogic(t *testing.T) { // 2. It should list with Limit=1 // 3. It should return true if any entry is received // 4. It should return false if EOF is received - + hasChildren := false if tt.listError == nil && tt.listResponse != nil { hasChildren = true } else if tt.listError == io.EOF { hasChildren = false } - + if hasChildren != tt.expectedResult { t.Errorf("hasChildren logic mismatch for %s:\n Expected: %v\n Got: %v\n Description: %s", tt.name, tt.expectedResult, hasChildren, tt.description) @@ -273,7 +273,7 @@ func TestImplicitDirectoryIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } - + t.Skip("Integration test - run manually with: cd test/s3/parquet && make test-implicit-dir-with-server") } @@ -283,4 +283,3 @@ func BenchmarkHasChildrenCheck(b *testing.B) { // Expected: ~1-5ms per call (one gRPC LIST request with Limit=1) b.Skip("Benchmark - requires full filer setup") } - diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go index 1e5171ce5..86a7bc74b 100644 --- a/weed/s3api/s3api_object_handlers_copy.go +++ b/weed/s3api/s3api_object_handlers_copy.go @@ -513,7 +513,7 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req if endOffset >= startOffset { partSize = uint64(endOffset - startOffset + 1) } - + dstEntry := &filer_pb.Entry{ Attributes: &filer_pb.FuseAttributes{ FileSize: partSize, diff --git a/weed/s3api/s3api_object_handlers_multipart.go b/weed/s3api/s3api_object_handlers_multipart.go index 07d484d80..3ea709b31 100644 --- a/weed/s3api/s3api_object_handlers_multipart.go +++ b/weed/s3api/s3api_object_handlers_multipart.go @@ -380,12 +380,12 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ r.Header.Set(s3_constants.SeaweedFSSSEKMSBaseIVHeader, base64.StdEncoding.EncodeToString(baseIV)) } else { - // Check if this upload uses SSE-S3 - if err := s3a.handleSSES3MultipartHeaders(r, uploadEntry, uploadID); err != nil { - glog.Errorf("Failed to setup SSE-S3 multipart headers: %v", err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return - } + // Check if this upload uses SSE-S3 + if err := s3a.handleSSES3MultipartHeaders(r, uploadEntry, uploadID); err != nil { + glog.Errorf("Failed to setup SSE-S3 multipart headers: %v", err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } } } } else if !errors.Is(err, filer_pb.ErrNotFound) { diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index b3738f1ec..b9a249923 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -372,7 +372,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader }) if err != nil { glog.Errorf("putToFiler: chunked upload failed: %v", err) - + // CRITICAL: Cleanup orphaned chunks before returning error // UploadReaderInChunks now returns partial results even on error, // allowing us to cleanup any chunks that were successfully uploaded @@ -381,7 +381,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader glog.Warningf("putToFiler: Upload failed, attempting to cleanup %d orphaned chunks", len(chunkResult.FileChunks)) s3a.deleteOrphanedChunks(chunkResult.FileChunks) } - + if strings.Contains(err.Error(), s3err.ErrMsgPayloadChecksumMismatch) { return "", s3err.ErrInvalidDigest, SSEResponseMetadata{} } @@ -432,7 +432,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader // SSE-KMS: Create per-chunk metadata with chunk-specific offsets // Each chunk needs its own metadata with ChunkOffset set for proper IV calculation during decryption chunk.SseType = filer_pb.SSEType_SSE_KMS - + // Create a copy of the SSE-KMS key with chunk-specific offset chunkSSEKey := &SSEKMSKey{ KeyID: sseKMSKey.KeyID, @@ -442,7 +442,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader IV: sseKMSKey.IV, ChunkOffset: chunk.Offset, // Set chunk-specific offset for IV calculation } - + // Serialize per-chunk metadata if chunkMetadata, serErr := SerializeSSEKMSMetadata(chunkSSEKey); serErr == nil { chunk.SseMetadata = chunkMetadata @@ -453,10 +453,10 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader // SSE-S3: Create per-chunk metadata with chunk-specific IVs // Each chunk needs its own IV calculated from the base IV + chunk offset chunk.SseType = filer_pb.SSEType_SSE_S3 - + // Calculate chunk-specific IV using base IV and chunk offset chunkIV, _ := calculateIVWithOffset(sseS3Key.IV, chunk.Offset) - + // Create a copy of the SSE-S3 key with chunk-specific IV chunkSSEKey := &SSES3Key{ Key: sseS3Key.Key, @@ -464,7 +464,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader Algorithm: sseS3Key.Algorithm, IV: chunkIV, // Use chunk-specific IV } - + // Serialize per-chunk metadata if chunkMetadata, serErr := SerializeSSES3Metadata(chunkSSEKey); serErr == nil { chunk.SseMetadata = chunkMetadata diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index 976ed2da9..b9c4eb3fc 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -90,7 +90,7 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl // Initialize bucket policy engine first policyEngine := NewBucketPolicyEngine() - + s3ApiServer = &S3ApiServer{ option: option, iam: iam, @@ -171,7 +171,7 @@ func (s3a *S3ApiServer) syncBucketPolicyToEngine(bucket string, policyDoc *polic if s3a.policyEngine == nil { return } - + if policyDoc != nil { if err := s3a.policyEngine.LoadBucketPolicyFromCache(bucket, policyDoc); err != nil { glog.Errorf("Failed to sync bucket policy for %s to policy engine: %v", bucket, err) diff --git a/weed/s3api/s3api_sse_chunk_metadata_test.go b/weed/s3api/s3api_sse_chunk_metadata_test.go index d02502249..ca38f44f4 100644 --- a/weed/s3api/s3api_sse_chunk_metadata_test.go +++ b/weed/s3api/s3api_sse_chunk_metadata_test.go @@ -36,8 +36,8 @@ func TestSSEKMSChunkMetadataAssignment(t *testing.T) { // Simulate multi-chunk upload scenario (what putToFiler does after UploadReaderInChunks) simulatedChunks := []*filer_pb.FileChunk{ - {FileId: "chunk1", Offset: 0, Size: 8 * 1024 * 1024}, // 8MB chunk at offset 0 - {FileId: "chunk2", Offset: 8 * 1024 * 1024, Size: 8 * 1024 * 1024}, // 8MB chunk at offset 8MB + {FileId: "chunk1", Offset: 0, Size: 8 * 1024 * 1024}, // 8MB chunk at offset 0 + {FileId: "chunk2", Offset: 8 * 1024 * 1024, Size: 8 * 1024 * 1024}, // 8MB chunk at offset 8MB {FileId: "chunk3", Offset: 16 * 1024 * 1024, Size: 4 * 1024 * 1024}, // 4MB chunk at offset 16MB } @@ -110,7 +110,7 @@ func TestSSEKMSChunkMetadataAssignment(t *testing.T) { t.Errorf("Chunk %d: KeyID mismatch", i) } - t.Logf("✓ Chunk %d: metadata deserialized successfully (ChunkOffset=%d, KeyID=%s)", + t.Logf("✓ Chunk %d: metadata deserialized successfully (ChunkOffset=%d, KeyID=%s)", i, deserializedKey.ChunkOffset, deserializedKey.KeyID) } @@ -167,8 +167,8 @@ func TestSSES3ChunkMetadataAssignment(t *testing.T) { // Simulate multi-chunk upload scenario (what putToFiler does after UploadReaderInChunks) simulatedChunks := []*filer_pb.FileChunk{ - {FileId: "chunk1", Offset: 0, Size: 8 * 1024 * 1024}, // 8MB chunk at offset 0 - {FileId: "chunk2", Offset: 8 * 1024 * 1024, Size: 8 * 1024 * 1024}, // 8MB chunk at offset 8MB + {FileId: "chunk1", Offset: 0, Size: 8 * 1024 * 1024}, // 8MB chunk at offset 0 + {FileId: "chunk2", Offset: 8 * 1024 * 1024, Size: 8 * 1024 * 1024}, // 8MB chunk at offset 8MB {FileId: "chunk3", Offset: 16 * 1024 * 1024, Size: 4 * 1024 * 1024}, // 4MB chunk at offset 16MB } @@ -359,4 +359,3 @@ func TestSSEChunkMetadataComparison(t *testing.T) { } }) } - diff --git a/weed/s3api/s3api_sse_decrypt_test.go b/weed/s3api/s3api_sse_decrypt_test.go index 043271bf4..f66a89ebd 100644 --- a/weed/s3api/s3api_sse_decrypt_test.go +++ b/weed/s3api/s3api_sse_decrypt_test.go @@ -73,12 +73,12 @@ func TestSSECDecryptChunkView_NoOffsetAdjustment(t *testing.T) { if err != nil { t.Fatalf("Failed to create decrypted reader (wrong): %v", err) } - + // Skip ivSkip bytes (as the buggy code would do) if ivSkip > 0 { io.CopyN(io.Discard, decryptedReaderWrong, int64(ivSkip)) } - + decryptedWrong, err := io.ReadAll(decryptedReaderWrong) if err != nil { t.Fatalf("Failed to read decrypted data (wrong): %v", err) @@ -107,7 +107,7 @@ func TestSSECDecryptChunkView_NoOffsetAdjustment(t *testing.T) { func TestSSEKMSDecryptChunkView_RequiresOffsetAdjustment(t *testing.T) { // Setup: Create test data plaintext := []byte("This is a test message for SSE-KMS decryption with offset adjustment") - + // Generate base IV and key baseIV := make([]byte, aes.BlockSize) key := make([]byte, 32) @@ -126,10 +126,10 @@ func TestSSEKMSDecryptChunkView_RequiresOffsetAdjustment(t *testing.T) { if err != nil { t.Fatalf("Failed to create cipher: %v", err) } - + ciphertext := make([]byte, len(plaintext)) stream := cipher.NewCTR(block, adjustedIV) - + // Skip ivSkip bytes in the encryption stream if needed if ivSkip > 0 { dummy := make([]byte, ivSkip) @@ -143,10 +143,10 @@ func TestSSEKMSDecryptChunkView_RequiresOffsetAdjustment(t *testing.T) { if err != nil { t.Fatalf("Failed to create cipher for decryption: %v", err) } - + decrypted := make([]byte, len(ciphertext)) streamDecrypt := cipher.NewCTR(blockDecrypt, adjustedIVDecrypt) - + // Skip ivSkip bytes in the decryption stream if ivSkipDecrypt > 0 { dummy := make([]byte, ivSkipDecrypt) @@ -166,7 +166,7 @@ func TestSSEKMSDecryptChunkView_RequiresOffsetAdjustment(t *testing.T) { if err != nil { t.Fatalf("Failed to create cipher for wrong decryption: %v", err) } - + decryptedWrong := make([]byte, len(ciphertext)) streamWrong := cipher.NewCTR(blockWrong, baseIV) // Use base IV directly - WRONG for SSE-KMS streamWrong.XORKeyStream(decryptedWrong, ciphertext) @@ -184,7 +184,6 @@ func TestSSEDecryptionDifferences(t *testing.T) { t.Log("SSE-C: Random IV per part → Use stored IV DIRECTLY (no offset)") t.Log("SSE-KMS: Base IV + offset → MUST call calculateIVWithOffset(baseIV, offset)") t.Log("SSE-S3: Base IV + offset → Stores ADJUSTED IV, use directly") - + // This test documents the critical differences and serves as executable documentation } - diff --git a/weed/s3api/s3api_sse_s3_upload_test.go b/weed/s3api/s3api_sse_s3_upload_test.go index a09104548..e349b9333 100644 --- a/weed/s3api/s3api_sse_s3_upload_test.go +++ b/weed/s3api/s3api_sse_s3_upload_test.go @@ -88,7 +88,7 @@ func TestSSES3MultipartUploadStoresDerivedIV(t *testing.T) { t.Fatalf("Failed to create cipher: %v", err) } stream := cipher.NewCTR(block, keyWithDerivedIV.IV) - + // Handle ivSkip for non-block-aligned offsets if ivSkip > 0 { skipDummy := make([]byte, ivSkip) @@ -136,7 +136,7 @@ func TestSSES3MultipartUploadStoresDerivedIV(t *testing.T) { // returned key contains the derived IV (not base IV). func TestHandleSSES3MultipartEncryptionFlow(t *testing.T) { // This test simulates what happens in a real multipart upload request - + // Generate test key manually (simulating a complete SSE-S3 key) keyBytes := make([]byte, 32) // 256-bit key if _, err := rand.Read(keyBytes); err != nil { @@ -209,9 +209,9 @@ func TestHandleSSES3MultipartEncryptionFlow(t *testing.T) { if err != nil { t.Fatalf("Failed to create cipher: %v", err) } - + stream := cipher.NewCTR(block, originalKey.IV) - + // Handle ivSkip for non-block-aligned offsets if ivSkip > 0 { skipDummy := make([]byte, ivSkip) @@ -255,4 +255,3 @@ func TestSSES3HeaderEncoding(t *testing.T) { s3_constants.AESBlockSize, len(decodedBaseIV)) } } - diff --git a/weed/util/log_buffer/log_buffer_corruption_test.go b/weed/util/log_buffer/log_buffer_corruption_test.go index c6f0b0591..2f7a029e6 100644 --- a/weed/util/log_buffer/log_buffer_corruption_test.go +++ b/weed/util/log_buffer/log_buffer_corruption_test.go @@ -15,32 +15,32 @@ import ( func TestReadTsCorruptedBuffer(t *testing.T) { // Create a corrupted buffer with invalid protobuf data buf := make([]byte, 100) - + // Set size field to 10 bytes (using proper encoding) util.Uint32toBytes(buf[0:4], 10) - + // Fill with garbage data that won't unmarshal as LogEntry for i := 4; i < 14; i++ { buf[i] = 0xFF } - + // Attempt to read timestamp size, ts, err := readTs(buf, 0) - + // Should return an error if err == nil { t.Error("Expected error for corrupted buffer, got nil") } - + // Size and ts should be zero on error if size != 0 { t.Errorf("Expected size=0 on error, got %d", size) } - + if ts != 0 { t.Errorf("Expected ts=0 on error, got %d", ts) } - + // Error should indicate corruption if !errors.Is(err, ErrBufferCorrupted) { t.Logf("Error message: %v", err) @@ -49,7 +49,7 @@ func TestReadTsCorruptedBuffer(t *testing.T) { t.Error("Expected non-empty error message") } } - + t.Logf("✓ readTs correctly returned error for corrupted buffer: %v", err) } @@ -60,42 +60,42 @@ func TestReadTsValidBuffer(t *testing.T) { TsNs: 123456789, Key: []byte("test-key"), } - + // Marshal it data, err := proto.Marshal(logEntry) if err != nil { t.Fatalf("Failed to marshal LogEntry: %v", err) } - + // Create buffer with size prefix using util function buf := make([]byte, 4+len(data)) util.Uint32toBytes(buf[0:4], uint32(len(data))) copy(buf[4:], data) - + // Read timestamp size, ts, err := readTs(buf, 0) - + // Should succeed if err != nil { t.Fatalf("Expected no error for valid buffer, got: %v", err) } - + // Should return correct values if size != len(data) { t.Errorf("Expected size=%d, got %d", len(data), size) } - + if ts != logEntry.TsNs { t.Errorf("Expected ts=%d, got %d", logEntry.TsNs, ts) } - + t.Logf("✓ readTs correctly parsed valid buffer: size=%d, ts=%d", size, ts) } // TestReadFromBufferCorruption tests that ReadFromBuffer propagates corruption errors func TestReadFromBufferCorruption(t *testing.T) { lb := NewLogBuffer("test-corruption", time.Second, nil, nil, func() {}) - + // Add a valid entry first using AddDataToBuffer validKey := []byte("valid") validData, _ := proto.Marshal(&filer_pb.LogEntry{ @@ -105,7 +105,7 @@ func TestReadFromBufferCorruption(t *testing.T) { if err := lb.AddDataToBuffer(validKey, validData, 1000); err != nil { t.Fatalf("Failed to add data to buffer: %v", err) } - + // Manually corrupt the buffer by writing garbage // This simulates a corruption scenario if len(lb.idx) > 0 { @@ -115,11 +115,11 @@ func TestReadFromBufferCorruption(t *testing.T) { lb.buf[i] = 0xFF } } - + // Try to read - should detect corruption startPos := MessagePosition{Time: lb.startTime} buf, offset, err := lb.ReadFromBuffer(startPos) - + // Should return corruption error if err == nil { t.Error("Expected corruption error, got nil") @@ -133,7 +133,7 @@ func TestReadFromBufferCorruption(t *testing.T) { } t.Logf("✓ ReadFromBuffer correctly detected corruption: %v", err) } - + t.Logf("ReadFromBuffer result: buf=%v, offset=%d, err=%v", buf != nil, offset, err) } @@ -144,18 +144,18 @@ func TestLocateByTsCorruption(t *testing.T) { buf: make([]byte, 100), size: 14, } - + // Set size field (using proper encoding) util.Uint32toBytes(mb.buf[0:4], 10) - + // Fill with garbage for i := 4; i < 14; i++ { mb.buf[i] = 0xFF } - + // Try to locate by timestamp pos, err := mb.locateByTs(mb.startTime) - + // Should return error if err == nil { t.Errorf("Expected corruption error, got nil (pos=%d)", pos) @@ -170,17 +170,17 @@ func TestErrorPropagationChain(t *testing.T) { // Already covered by TestReadTsCorruptedBuffer t.Log("✓ readTs error propagation tested") }) - + t.Run("Corruption in locateByTs", func(t *testing.T) { - // Already covered by TestLocateByTsCorruption + // Already covered by TestLocateByTsCorruption t.Log("✓ locateByTs error propagation tested") }) - + t.Run("Corruption in ReadFromBuffer binary search", func(t *testing.T) { // Already covered by TestReadFromBufferCorruption t.Log("✓ ReadFromBuffer error propagation tested") }) - + t.Log("✓ Complete error propagation chain verified") } @@ -203,18 +203,18 @@ func TestNoSilentCorruption(t *testing.T) { pos: 0, }, } - + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { size, ts, err := readTs(tc.buf, tc.pos) - + // CRITICAL: Must return error, never silent (0, 0) if err == nil { t.Errorf("CRITICAL: readTs returned (%d, %d, nil) for corrupted buffer - this causes silent data corruption!", size, ts) } else { t.Logf("✓ Correctly returned error instead of silent (0, 0): %v", err) } - + // On error, size and ts should be 0 if size != 0 || ts != 0 { t.Errorf("On error, expected (0, 0), got (%d, %d)", size, ts) @@ -222,4 +222,3 @@ func TestNoSilentCorruption(t *testing.T) { }) } } - diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go index 2c24a608c..d99a8f20c 100644 --- a/weed/util/log_buffer/log_buffer_test.go +++ b/weed/util/log_buffer/log_buffer_test.go @@ -141,16 +141,16 @@ func TestReadFromBuffer_OldOffsetReturnsResumeFromDiskError(t *testing.T) { // Add some data to the buffer if needed (at current offset position) if tt.hasData { - testData := []byte("test message") - // Use AddLogEntryToBuffer to preserve offset information - if err := lb.AddLogEntryToBuffer(&filer_pb.LogEntry{ - TsNs: time.Now().UnixNano(), - Key: []byte("key"), - Data: testData, - Offset: tt.currentOffset, // Add data at current offset - }); err != nil { - t.Fatalf("Failed to add log entry: %v", err) - } + testData := []byte("test message") + // Use AddLogEntryToBuffer to preserve offset information + if err := lb.AddLogEntryToBuffer(&filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + Key: []byte("key"), + Data: testData, + Offset: tt.currentOffset, // Add data at current offset + }); err != nil { + t.Fatalf("Failed to add log entry: %v", err) + } } // Create an offset-based position for the requested offset diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 4c9106a1a..0a2b8e89a 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -223,13 +223,13 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, star } bytesBuf, offset, err = logBuffer.ReadFromBuffer(lastReadPosition) glog.V(4).Infof("ReadFromBuffer for %s returned bytesBuf=%v, offset=%d, err=%v", readerName, bytesBuf != nil, offset, err) - + // Check for buffer corruption error before other error handling if err != nil && errors.Is(err, ErrBufferCorrupted) { glog.Errorf("%s: Buffer corruption detected: %v", readerName, err) return lastReadPosition, true, fmt.Errorf("buffer corruption: %w", err) } - + if err == ResumeFromDiskError { // Try to read from disk if readFromDiskFn is available if logBuffer.ReadFromDiskFn != nil {