Browse Source

go fmt

pull/7481/head
chrislu 2 weeks ago
parent
commit
74e864bc65
  1. 1
      test/s3/sse/s3_sse_range_server_test.go
  2. 167
      weed/operation/upload_chunked.go
  3. 73
      weed/operation/upload_chunked_test.go
  4. 2
      weed/pb/filer_pb/filer_pb_helper.go
  5. 94
      weed/s3api/auth_credentials.go
  6. 2
      weed/s3api/auth_credentials_subscribe.go
  7. 4
      weed/s3api/custom_types.go
  8. 4
      weed/s3api/filer_util.go
  9. 5
      weed/s3api/policy_conversion.go
  10. 27
      weed/s3api/policy_conversion_test.go
  11. 12
      weed/s3api/s3_constants/header.go
  12. 5
      weed/s3api/s3_sse_s3_multipart_test.go
  13. 3
      weed/s3api/s3api_bucket_policy_arn_test.go
  14. 4
      weed/s3api/s3api_bucket_policy_engine.go
  15. 19
      weed/s3api/s3api_implicit_directory_test.go
  16. 2
      weed/s3api/s3api_object_handlers_copy.go
  17. 12
      weed/s3api/s3api_object_handlers_multipart.go
  18. 14
      weed/s3api/s3api_object_handlers_put.go
  19. 4
      weed/s3api/s3api_server.go
  20. 11
      weed/s3api/s3api_sse_chunk_metadata_test.go
  21. 19
      weed/s3api/s3api_sse_decrypt_test.go
  22. 9
      weed/s3api/s3api_sse_s3_upload_test.go
  23. 63
      weed/util/log_buffer/log_buffer_corruption_test.go
  24. 20
      weed/util/log_buffer/log_buffer_test.go
  25. 4
      weed/util/log_buffer/log_read.go

1
test/s3/sse/s3_sse_range_server_test.go

@ -443,4 +443,3 @@ func TestSSEMultipartRangeRequestsServerBehavior(t *testing.T) {
assert.Equal(t, expectedData, bodyBytes,
"Cross-part range content must be correctly decrypted and assembled")
}

167
weed/operation/upload_chunked.go

@ -26,16 +26,16 @@ type ChunkedUploadResult struct {
// ChunkedUploadOption contains options for chunked uploads
type ChunkedUploadOption struct {
ChunkSize int32
SmallFileLimit int64
Collection string
Replication string
DataCenter string
SaveSmallInline bool
Jwt security.EncodedJwt
MimeType string
AssignFunc func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error)
UploadFunc func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) // Optional: for testing
ChunkSize int32
SmallFileLimit int64
Collection string
Replication string
DataCenter string
SaveSmallInline bool
Jwt security.EncodedJwt
MimeType string
AssignFunc func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error)
UploadFunc func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) // Optional: for testing
}
var chunkBufferPool = sync.Pool{
@ -48,25 +48,25 @@ var chunkBufferPool = sync.Pool{
// This prevents OOM by processing the stream in fixed-size chunks
// Returns file chunks, MD5 hash, total size, and any small content stored inline
func UploadReaderInChunks(ctx context.Context, reader io.Reader, opt *ChunkedUploadOption) (*ChunkedUploadResult, error) {
md5Hash := md5.New()
var partReader = io.TeeReader(reader, md5Hash)
var fileChunks []*filer_pb.FileChunk
var fileChunksLock sync.Mutex
var uploadErr error
var uploadErrLock sync.Mutex
var chunkOffset int64 = 0
var wg sync.WaitGroup
const bytesBufferCounter = 4
bytesBufferLimitChan := make(chan struct{}, bytesBufferCounter)
uploadLoop:
for {
// Throttle buffer usage
bytesBufferLimitChan <- struct{}{}
// Check for errors from parallel uploads
uploadErrLock.Lock()
if uploadErr != nil {
@ -75,7 +75,7 @@ uploadLoop:
break
}
uploadErrLock.Unlock()
// Check for context cancellation
select {
case <-ctx.Done():
@ -88,12 +88,12 @@ uploadLoop:
break uploadLoop
default:
}
// Get buffer from pool
bytesBuffer := chunkBufferPool.Get().(*bytes.Buffer)
limitedReader := io.LimitReader(partReader, int64(opt.ChunkSize))
bytesBuffer.Reset()
// Read one chunk
dataSize, err := bytesBuffer.ReadFrom(limitedReader)
if err != nil {
@ -120,26 +120,26 @@ uploadLoop:
// which is valid (e.g., touch command creates 0-byte files)
break
}
// For small files at offset 0, store inline instead of uploading
if chunkOffset == 0 && opt.SaveSmallInline && dataSize < opt.SmallFileLimit {
smallContent := make([]byte, dataSize)
n, readErr := io.ReadFull(bytesBuffer, smallContent)
chunkBufferPool.Put(bytesBuffer)
<-bytesBufferLimitChan
if readErr != nil {
return nil, fmt.Errorf("failed to read small content: read %d of %d bytes: %w", n, dataSize, readErr)
// For small files at offset 0, store inline instead of uploading
if chunkOffset == 0 && opt.SaveSmallInline && dataSize < opt.SmallFileLimit {
smallContent := make([]byte, dataSize)
n, readErr := io.ReadFull(bytesBuffer, smallContent)
chunkBufferPool.Put(bytesBuffer)
<-bytesBufferLimitChan
if readErr != nil {
return nil, fmt.Errorf("failed to read small content: read %d of %d bytes: %w", n, dataSize, readErr)
}
return &ChunkedUploadResult{
FileChunks: nil,
Md5Hash: md5Hash,
TotalSize: dataSize,
SmallContent: smallContent,
}, nil
}
return &ChunkedUploadResult{
FileChunks: nil,
Md5Hash: md5Hash,
TotalSize: dataSize,
SmallContent: smallContent,
}, nil
}
// Upload chunk in parallel goroutine
wg.Add(1)
go func(offset int64, buf *bytes.Buffer) {
@ -148,7 +148,7 @@ uploadLoop:
<-bytesBufferLimitChan
wg.Done()
}()
// Assign volume for this chunk
_, assignResult, assignErr := opt.AssignFunc(ctx, 1)
if assignErr != nil {
@ -159,29 +159,29 @@ uploadLoop:
uploadErrLock.Unlock()
return
}
// Upload chunk data
uploadUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
// Use per-assignment JWT if present, otherwise fall back to the original JWT
// This is critical for secured clusters where each volume assignment has its own JWT
jwt := opt.Jwt
if assignResult.Auth != "" {
jwt = assignResult.Auth
}
uploadOption := &UploadOption{
UploadUrl: uploadUrl,
Cipher: false,
IsInputCompressed: false,
MimeType: opt.MimeType,
PairMap: nil,
Jwt: jwt,
}
// Upload chunk data
uploadUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
// Use per-assignment JWT if present, otherwise fall back to the original JWT
// This is critical for secured clusters where each volume assignment has its own JWT
jwt := opt.Jwt
if assignResult.Auth != "" {
jwt = assignResult.Auth
}
uploadOption := &UploadOption{
UploadUrl: uploadUrl,
Cipher: false,
IsInputCompressed: false,
MimeType: opt.MimeType,
PairMap: nil,
Jwt: jwt,
}
var uploadResult *UploadResult
var uploadResultErr error
// Use mock upload function if provided (for testing), otherwise use real uploader
if opt.UploadFunc != nil {
uploadResult, uploadResultErr = opt.UploadFunc(ctx, buf.Bytes(), uploadOption)
@ -197,7 +197,7 @@ uploadLoop:
}
uploadResult, uploadResultErr = uploader.UploadData(ctx, buf.Bytes(), uploadOption)
}
if uploadResultErr != nil {
uploadErrLock.Lock()
if uploadErr == nil {
@ -206,46 +206,46 @@ uploadLoop:
uploadErrLock.Unlock()
return
}
// Create chunk entry
// Set ModifiedTsNs to current time (nanoseconds) to track when upload completed
// This is critical for multipart uploads where the same part may be uploaded multiple times
// The part with the latest ModifiedTsNs is selected as the authoritative version
fid, _ := filer_pb.ToFileIdObject(assignResult.Fid)
chunk := &filer_pb.FileChunk{
FileId: assignResult.Fid,
Offset: offset,
Size: uint64(uploadResult.Size),
ModifiedTsNs: time.Now().UnixNano(),
ETag: uploadResult.ContentMd5,
Fid: fid,
CipherKey: uploadResult.CipherKey,
}
// Create chunk entry
// Set ModifiedTsNs to current time (nanoseconds) to track when upload completed
// This is critical for multipart uploads where the same part may be uploaded multiple times
// The part with the latest ModifiedTsNs is selected as the authoritative version
fid, _ := filer_pb.ToFileIdObject(assignResult.Fid)
chunk := &filer_pb.FileChunk{
FileId: assignResult.Fid,
Offset: offset,
Size: uint64(uploadResult.Size),
ModifiedTsNs: time.Now().UnixNano(),
ETag: uploadResult.ContentMd5,
Fid: fid,
CipherKey: uploadResult.CipherKey,
}
fileChunksLock.Lock()
fileChunks = append(fileChunks, chunk)
glog.V(4).Infof("uploaded chunk %d to %s [%d,%d)", len(fileChunks), chunk.FileId, offset, offset+int64(chunk.Size))
fileChunksLock.Unlock()
}(chunkOffset, bytesBuffer)
// Update offset for next chunk
chunkOffset += dataSize
// If this was a partial chunk, we're done
if dataSize < int64(opt.ChunkSize) {
break
}
}
// Wait for all uploads to complete
wg.Wait()
// Sort chunks by offset (do this even if there's an error, for cleanup purposes)
sort.Slice(fileChunks, func(i, j int) bool {
return fileChunks[i].Offset < fileChunks[j].Offset
})
// Check for errors - return partial results for cleanup
if uploadErr != nil {
glog.Errorf("chunked upload failed: %v (returning %d partial chunks for cleanup)", uploadErr, len(fileChunks))
@ -257,7 +257,7 @@ uploadLoop:
SmallContent: nil,
}, uploadErr
}
return &ChunkedUploadResult{
FileChunks: fileChunks,
Md5Hash: md5Hash,
@ -265,4 +265,3 @@ uploadLoop:
SmallContent: nil,
}, nil
}

73
weed/operation/upload_chunked_test.go

@ -16,13 +16,13 @@ func TestUploadReaderInChunksReturnsPartialResultsOnError(t *testing.T) {
// Create test data larger than one chunk to force multiple chunk uploads
testData := bytes.Repeat([]byte("test data for chunk upload failure testing"), 1000) // ~40KB
reader := bytes.NewReader(testData)
uploadAttempts := 0
// Create a mock assign function that succeeds for first chunk, then fails
assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) {
uploadAttempts++
if uploadAttempts == 1 {
// First chunk succeeds
return nil, &AssignResult{
@ -32,11 +32,11 @@ func TestUploadReaderInChunksReturnsPartialResultsOnError(t *testing.T) {
Count: 1,
}, nil
}
// Second chunk fails (simulating volume server down or network error)
return nil, nil, errors.New("simulated volume assignment failure")
}
// Mock upload function that simulates successful upload
uploadFunc := func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) {
return &UploadResult{
@ -46,7 +46,7 @@ func TestUploadReaderInChunksReturnsPartialResultsOnError(t *testing.T) {
Error: "",
}, nil
}
// Attempt upload with small chunk size to trigger multiple uploads
result, err := UploadReaderInChunks(context.Background(), reader, &ChunkedUploadOption{
ChunkSize: 8 * 1024, // 8KB chunks
@ -57,32 +57,32 @@ func TestUploadReaderInChunksReturnsPartialResultsOnError(t *testing.T) {
AssignFunc: assignFunc,
UploadFunc: uploadFunc,
})
// VERIFICATION 1: Error should be returned
if err == nil {
t.Fatal("Expected error from UploadReaderInChunks, got nil")
}
t.Logf("✓ Got expected error: %v", err)
// VERIFICATION 2: Result should NOT be nil (this is the fix)
if result == nil {
t.Fatal("CRITICAL: UploadReaderInChunks returned nil result on error - caller cannot cleanup orphaned chunks!")
}
t.Log("✓ Result is not nil (partial results returned)")
// VERIFICATION 3: Result should contain partial chunks from successful uploads
// Note: In reality, the first chunk upload would succeed before assignment fails for chunk 2
// But in this test, assignment fails immediately for chunk 2, so we may have 0 chunks
// The important thing is that the result struct is returned, not that it has chunks
t.Logf("✓ Result contains %d chunks (may be 0 if all assignments failed)", len(result.FileChunks))
// VERIFICATION 4: MD5 hash should be available even on partial failure
if result.Md5Hash == nil {
t.Error("Expected Md5Hash to be non-nil")
} else {
t.Log("✓ Md5Hash is available for partial data")
}
// VERIFICATION 5: TotalSize should reflect bytes read before failure
if result.TotalSize < 0 {
t.Errorf("Expected non-negative TotalSize, got %d", result.TotalSize)
@ -95,7 +95,7 @@ func TestUploadReaderInChunksReturnsPartialResultsOnError(t *testing.T) {
func TestUploadReaderInChunksSuccessPath(t *testing.T) {
testData := []byte("small test data")
reader := bytes.NewReader(testData)
// Mock assign function that always succeeds
assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) {
return nil, &AssignResult{
@ -105,7 +105,7 @@ func TestUploadReaderInChunksSuccessPath(t *testing.T) {
Count: 1,
}, nil
}
// Mock upload function that simulates successful upload
uploadFunc := func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) {
return &UploadResult{
@ -115,7 +115,7 @@ func TestUploadReaderInChunksSuccessPath(t *testing.T) {
Error: "",
}, nil
}
result, err := UploadReaderInChunks(context.Background(), reader, &ChunkedUploadOption{
ChunkSize: 8 * 1024,
SmallFileLimit: 256,
@ -125,40 +125,40 @@ func TestUploadReaderInChunksSuccessPath(t *testing.T) {
AssignFunc: assignFunc,
UploadFunc: uploadFunc,
})
// VERIFICATION 1: No error should occur
if err != nil {
t.Fatalf("Expected successful upload, got error: %v", err)
}
t.Log("✓ Upload completed without error")
// VERIFICATION 2: Result should not be nil
if result == nil {
t.Fatal("Expected non-nil result")
}
t.Log("✓ Result is not nil")
// VERIFICATION 3: Should have file chunks
if len(result.FileChunks) == 0 {
t.Error("Expected at least one file chunk")
} else {
t.Logf("✓ Result contains %d file chunk(s)", len(result.FileChunks))
}
// VERIFICATION 4: Total size should match input data
if result.TotalSize != int64(len(testData)) {
t.Errorf("Expected TotalSize=%d, got %d", len(testData), result.TotalSize)
} else {
t.Logf("✓ TotalSize=%d matches input data", result.TotalSize)
}
// VERIFICATION 5: MD5 hash should be available
if result.Md5Hash == nil {
t.Error("Expected non-nil Md5Hash")
} else {
t.Log("✓ Md5Hash is available")
}
// VERIFICATION 6: Chunk should have expected properties
if len(result.FileChunks) > 0 {
chunk := result.FileChunks[0]
@ -180,13 +180,13 @@ func TestUploadReaderInChunksSuccessPath(t *testing.T) {
func TestUploadReaderInChunksContextCancellation(t *testing.T) {
testData := bytes.Repeat([]byte("test data"), 10000) // ~80KB
reader := bytes.NewReader(testData)
// Create a context that we'll cancel
ctx, cancel := context.WithCancel(context.Background())
// Cancel immediately to trigger cancellation handling
cancel()
assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) {
return nil, &AssignResult{
Fid: "test-fid,1234",
@ -195,7 +195,7 @@ func TestUploadReaderInChunksContextCancellation(t *testing.T) {
Count: 1,
}, nil
}
// Mock upload function that simulates successful upload
uploadFunc := func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) {
return &UploadResult{
@ -205,7 +205,7 @@ func TestUploadReaderInChunksContextCancellation(t *testing.T) {
Error: "",
}, nil
}
result, err := UploadReaderInChunks(ctx, reader, &ChunkedUploadOption{
ChunkSize: 8 * 1024,
SmallFileLimit: 256,
@ -215,12 +215,12 @@ func TestUploadReaderInChunksContextCancellation(t *testing.T) {
AssignFunc: assignFunc,
UploadFunc: uploadFunc,
})
// Should get context cancelled error
if err == nil {
t.Error("Expected context cancellation error")
}
// Should still get partial results for cleanup
if result == nil {
t.Error("Expected non-nil result even on context cancellation")
@ -240,7 +240,7 @@ func (m *mockFailingReader) Read(p []byte) (n int, err error) {
if m.pos >= m.failAfter {
return 0, errors.New("simulated read failure")
}
remaining := m.failAfter - m.pos
toRead := len(p)
if toRead > remaining {
@ -249,11 +249,11 @@ func (m *mockFailingReader) Read(p []byte) (n int, err error) {
if toRead > len(m.data)-m.pos {
toRead = len(m.data) - m.pos
}
if toRead == 0 {
return 0, io.EOF
}
copy(p, m.data[m.pos:m.pos+toRead])
m.pos += toRead
return toRead, nil
@ -267,7 +267,7 @@ func TestUploadReaderInChunksReaderFailure(t *testing.T) {
pos: 0,
failAfter: 10000, // Fail after 10KB
}
assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) {
return nil, &AssignResult{
Fid: "test-fid,1234",
@ -276,7 +276,7 @@ func TestUploadReaderInChunksReaderFailure(t *testing.T) {
Count: 1,
}, nil
}
// Mock upload function that simulates successful upload
uploadFunc := func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) {
return &UploadResult{
@ -286,7 +286,7 @@ func TestUploadReaderInChunksReaderFailure(t *testing.T) {
Error: "",
}, nil
}
result, err := UploadReaderInChunks(context.Background(), failingReader, &ChunkedUploadOption{
ChunkSize: 8 * 1024, // 8KB chunks
SmallFileLimit: 256,
@ -296,18 +296,17 @@ func TestUploadReaderInChunksReaderFailure(t *testing.T) {
AssignFunc: assignFunc,
UploadFunc: uploadFunc,
})
// Should get read error
if err == nil {
t.Error("Expected read failure error")
}
// Should still get partial results
if result == nil {
t.Fatal("Expected non-nil result on read failure")
}
t.Logf("✓ Got partial result on read failure: chunks=%d, totalSize=%d",
len(result.FileChunks), result.TotalSize)
}

2
weed/pb/filer_pb/filer_pb_helper.go

@ -39,7 +39,7 @@ func (entry *Entry) GetExpiryTime() (expiryTime int64) {
return expiryTime
}
}
// Regular TTL expiration: base on creation time only
expiryTime = entry.Attributes.Crtime + int64(entry.Attributes.TtlSec)
return expiryTime

94
weed/s3api/auth_credentials.go

@ -53,7 +53,7 @@ type IdentityAccessManagement struct {
// IAM Integration for advanced features
iamIntegration *S3IAMIntegration
// Bucket policy engine for evaluating bucket policies
policyEngine *BucketPolicyEngine
}
@ -177,41 +177,41 @@ func NewIdentityAccessManagementWithStore(option *S3ApiServerOption, explicitSto
accessKeyId := os.Getenv("AWS_ACCESS_KEY_ID")
secretAccessKey := os.Getenv("AWS_SECRET_ACCESS_KEY")
if accessKeyId != "" && secretAccessKey != "" {
glog.V(1).Infof("No S3 configuration found, using AWS environment variables as fallback")
if accessKeyId != "" && secretAccessKey != "" {
glog.V(1).Infof("No S3 configuration found, using AWS environment variables as fallback")
// Create environment variable identity name
identityNameSuffix := accessKeyId
if len(accessKeyId) > 8 {
identityNameSuffix = accessKeyId[:8]
}
// Create environment variable identity name
identityNameSuffix := accessKeyId
if len(accessKeyId) > 8 {
identityNameSuffix = accessKeyId[:8]
}
// Create admin identity with environment variable credentials
envIdentity := &Identity{
Name: "admin-" + identityNameSuffix,
Account: &AccountAdmin,
Credentials: []*Credential{
{
AccessKey: accessKeyId,
SecretKey: secretAccessKey,
// Create admin identity with environment variable credentials
envIdentity := &Identity{
Name: "admin-" + identityNameSuffix,
Account: &AccountAdmin,
Credentials: []*Credential{
{
AccessKey: accessKeyId,
SecretKey: secretAccessKey,
},
},
},
Actions: []Action{
s3_constants.ACTION_ADMIN,
},
}
Actions: []Action{
s3_constants.ACTION_ADMIN,
},
}
// Set as the only configuration
iam.m.Lock()
if len(iam.identities) == 0 {
iam.identities = []*Identity{envIdentity}
iam.accessKeyIdent = map[string]*Identity{accessKeyId: envIdentity}
iam.isAuthEnabled = true
}
iam.m.Unlock()
// Set as the only configuration
iam.m.Lock()
if len(iam.identities) == 0 {
iam.identities = []*Identity{envIdentity}
iam.accessKeyIdent = map[string]*Identity{accessKeyId: envIdentity}
iam.isAuthEnabled = true
}
iam.m.Unlock()
glog.V(1).Infof("Added admin identity from AWS environment variables: %s", envIdentity.Name)
}
glog.V(1).Infof("Added admin identity from AWS environment variables: %s", envIdentity.Name)
}
}
return iam
@ -460,13 +460,13 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action)
case authTypeJWT:
glog.V(3).Infof("jwt auth type detected, iamIntegration != nil? %t", iam.iamIntegration != nil)
r.Header.Set(s3_constants.AmzAuthType, "Jwt")
if iam.iamIntegration != nil {
identity, s3Err = iam.authenticateJWTWithIAM(r)
authType = "Jwt"
} else {
glog.V(2).Infof("IAM integration is nil, returning ErrNotImplemented")
return identity, s3err.ErrNotImplemented
}
if iam.iamIntegration != nil {
identity, s3Err = iam.authenticateJWTWithIAM(r)
authType = "Jwt"
} else {
glog.V(2).Infof("IAM integration is nil, returning ErrNotImplemented")
return identity, s3err.ErrNotImplemented
}
case authTypeAnonymous:
authType = "Anonymous"
if identity, found = iam.lookupAnonymous(); !found {
@ -501,7 +501,7 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action)
// For ListBuckets, authorization is performed in the handler by iterating
// through buckets and checking permissions for each. Skip the global check here.
policyAllows := false
if action == s3_constants.ACTION_LIST && bucket == "" {
// ListBuckets operation - authorization handled per-bucket in the handler
} else {
@ -515,7 +515,7 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action)
principal := buildPrincipalARN(identity)
// Use context-aware policy evaluation to get the correct S3 action
allowed, evaluated, err := iam.policyEngine.EvaluatePolicyWithContext(bucket, object, string(action), principal, r)
if err != nil {
// SECURITY: Fail-close on policy evaluation errors
// If we can't evaluate the policy, deny access rather than falling through to IAM
@ -537,7 +537,7 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action)
}
// If not evaluated (no policy or no matching statements), fall through to IAM/identity checks
}
// Only check IAM if bucket policy didn't explicitly allow
// This ensures bucket policies can independently grant access (AWS semantics)
if !policyAllows {
@ -617,26 +617,26 @@ func buildPrincipalARN(identity *Identity) string {
if identity == nil {
return "*" // Anonymous
}
// Check if this is the anonymous user identity (authenticated as anonymous)
// S3 policies expect Principal: "*" for anonymous access
if identity.Name == s3_constants.AccountAnonymousId ||
(identity.Account != nil && identity.Account.Id == s3_constants.AccountAnonymousId) {
if identity.Name == s3_constants.AccountAnonymousId ||
(identity.Account != nil && identity.Account.Id == s3_constants.AccountAnonymousId) {
return "*" // Anonymous user
}
// Build an AWS-compatible principal ARN
// Format: arn:aws:iam::account-id:user/user-name
accountId := identity.Account.Id
if accountId == "" {
accountId = "000000000000" // Default account ID
}
userName := identity.Name
if userName == "" {
userName = "unknown"
}
return fmt.Sprintf("arn:aws:iam::%s:user/%s", accountId, userName)
}

2
weed/s3api/auth_credentials_subscribe.go

@ -145,7 +145,7 @@ func (s3a *S3ApiServer) updateBucketConfigCacheFromEntry(entry *filer_pb.Entry)
} else {
glog.V(3).Infof("updateBucketConfigCacheFromEntry: no Object Lock configuration found for bucket %s", bucket)
}
// Load bucket policy if present (for performance optimization)
config.BucketPolicy = loadBucketPolicyFromExtended(entry, bucket)
}

4
weed/s3api/custom_types.go

@ -10,6 +10,6 @@ const s3TimeFormat = "2006-01-02T15:04:05.999Z07:00"
// ConditionalHeaderResult holds the result of conditional header checking
type ConditionalHeaderResult struct {
ErrorCode s3err.ErrorCode
ETag string // ETag of the object (for 304 responses)
Entry *filer_pb.Entry // Entry fetched during conditional check (nil if not fetched or object doesn't exist)
ETag string // ETag of the object (for 304 responses)
Entry *filer_pb.Entry // Entry fetched during conditional check (nil if not fetched or object doesn't exist)
}

4
weed/s3api/filer_util.go

@ -137,9 +137,9 @@ func (s3a *S3ApiServer) updateEntriesTTL(parentDirectoryPath string, ttlSec int3
}
// processDirectoryTTL processes a single directory in paginated batches
func (s3a *S3ApiServer) processDirectoryTTL(ctx context.Context, client filer_pb.SeaweedFilerClient,
func (s3a *S3ApiServer) processDirectoryTTL(ctx context.Context, client filer_pb.SeaweedFilerClient,
dir string, ttlSec int32, dirsToProcess *[]string, updateErrors *[]error) error {
const batchSize = filer.PaginationSize
startFrom := ""

5
weed/s3api/policy_conversion.go

@ -140,13 +140,13 @@ func convertPrincipal(principal interface{}) (*policy_engine.StringOrStringSlice
// Handle AWS-style principal with service/user keys
// Example: {"AWS": "arn:aws:iam::123456789012:user/Alice"}
// Only AWS principals are supported for now. Other types like Service or Federated need special handling.
awsPrincipals, ok := p["AWS"]
if !ok || len(p) != 1 {
glog.Warningf("unsupported principal map, only a single 'AWS' key is supported: %v", p)
return nil, fmt.Errorf("unsupported principal map, only a single 'AWS' key is supported, got keys: %v", getMapKeys(p))
}
// Recursively convert the AWS principal value
res, err := convertPrincipal(awsPrincipals)
if err != nil {
@ -236,4 +236,3 @@ func getMapKeys(m map[string]interface{}) []string {
}
return keys
}

27
weed/s3api/policy_conversion_test.go

@ -13,10 +13,10 @@ func TestConvertPolicyDocumentWithMixedTypes(t *testing.T) {
Version: "2012-10-17",
Statement: []policy.Statement{
{
Sid: "TestMixedTypes",
Effect: "Allow",
Action: []string{"s3:GetObject"},
Resource: []string{"arn:aws:s3:::bucket/*"},
Sid: "TestMixedTypes",
Effect: "Allow",
Action: []string{"s3:GetObject"},
Resource: []string{"arn:aws:s3:::bucket/*"},
Principal: []interface{}{"user1", 123, true}, // Mixed types
Condition: map[string]map[string]interface{}{
"NumericEquals": {
@ -90,7 +90,7 @@ func TestConvertPolicyDocumentWithMixedTypes(t *testing.T) {
}
}
// Check StringEquals condition
// Check StringEquals condition
stringCond, ok := stmt.Condition["StringEquals"]
if !ok {
t.Fatal("Expected StringEquals condition")
@ -116,7 +116,7 @@ func TestConvertPrincipalWithMapAndMixedTypes(t *testing.T) {
principalMap := map[string]interface{}{
"AWS": []interface{}{
"arn:aws:iam::123456789012:user/Alice",
456, // User ID as number
456, // User ID as number
true, // Some boolean value
},
}
@ -125,7 +125,7 @@ func TestConvertPrincipalWithMapAndMixedTypes(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if result == nil {
t.Fatal("Expected non-nil result")
}
@ -230,7 +230,7 @@ func TestConvertPrincipalWithNilValues(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if result == nil {
t.Fatal("Expected non-nil result")
}
@ -296,7 +296,7 @@ func TestConvertPrincipalMapWithNilValues(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if result == nil {
t.Fatal("Expected non-nil result")
}
@ -322,11 +322,11 @@ func TestConvertPrincipalMapWithNilValues(t *testing.T) {
func TestConvertToStringUnsupportedType(t *testing.T) {
// Test that unsupported types (e.g., nested maps/slices) return empty string
// This should trigger a warning log and return an error
type customStruct struct {
Field string
}
testCases := []struct {
name string
input interface{}
@ -494,7 +494,7 @@ func TestConvertPrincipalEmptyStrings(t *testing.T) {
func TestConvertStatementWithUnsupportedFields(t *testing.T) {
// Test that errors are returned for unsupported fields
// These fields are critical for policy semantics and ignoring them would be a security risk
testCases := []struct {
name string
statement *policy.Statement
@ -544,7 +544,7 @@ func TestConvertStatementWithUnsupportedFields(t *testing.T) {
} else if !strings.Contains(err.Error(), tc.wantError) {
t.Errorf("Expected error containing %q, got: %v", tc.wantError, err)
}
// Verify zero-value struct is returned on error
if result.Sid != "" || result.Effect != "" {
t.Error("Expected zero-value struct on error")
@ -611,4 +611,3 @@ func TestConvertPolicyDocumentWithId(t *testing.T) {
t.Errorf("Expected 1 statement, got %d", len(dest.Statement))
}
}

12
weed/s3api/s3_constants/header.go

@ -39,13 +39,13 @@ const (
AmzObjectTaggingDirective = "X-Amz-Tagging-Directive"
AmzTagCount = "x-amz-tagging-count"
SeaweedFSIsDirectoryKey = "X-Seaweedfs-Is-Directory-Key"
SeaweedFSPartNumber = "X-Seaweedfs-Part-Number"
SeaweedFSUploadId = "X-Seaweedfs-Upload-Id"
SeaweedFSMultipartPartsCount = "X-Seaweedfs-Multipart-Parts-Count"
SeaweedFSIsDirectoryKey = "X-Seaweedfs-Is-Directory-Key"
SeaweedFSPartNumber = "X-Seaweedfs-Part-Number"
SeaweedFSUploadId = "X-Seaweedfs-Upload-Id"
SeaweedFSMultipartPartsCount = "X-Seaweedfs-Multipart-Parts-Count"
SeaweedFSMultipartPartBoundaries = "X-Seaweedfs-Multipart-Part-Boundaries" // JSON: [{part:1,start:0,end:2,etag:"abc"},{part:2,start:2,end:3,etag:"def"}]
SeaweedFSExpiresS3 = "X-Seaweedfs-Expires-S3"
AmzMpPartsCount = "x-amz-mp-parts-count"
SeaweedFSExpiresS3 = "X-Seaweedfs-Expires-S3"
AmzMpPartsCount = "x-amz-mp-parts-count"
// S3 ACL headers
AmzCannedAcl = "X-Amz-Acl"

5
weed/s3api/s3_sse_s3_multipart_test.go

@ -20,7 +20,7 @@ func TestSSES3MultipartChunkViewDecryption(t *testing.T) {
// Create test plaintext
plaintext := []byte("This is test data for SSE-S3 multipart encryption testing")
// Simulate multipart upload with 2 parts at different offsets
testCases := []struct {
name string
@ -217,7 +217,7 @@ func TestSSES3ChunkMetadataDetection(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
hasPerChunkMetadata := tc.chunk.GetSseType() == filer_pb.SSEType_SSE_S3 && len(tc.chunk.GetSseMetadata()) > 0
if hasPerChunkMetadata != tc.expectedMultipart {
t.Errorf("Expected multipart=%v, got hasPerChunkMetadata=%v", tc.expectedMultipart, hasPerChunkMetadata)
}
@ -264,4 +264,3 @@ func TestSSES3EncryptionConsistency(t *testing.T) {
t.Error("Second decryption should also work with fresh stream")
}
}

3
weed/s3api/s3api_bucket_policy_arn_test.go

@ -2,7 +2,7 @@ package s3api
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
)
@ -123,4 +123,3 @@ func TestBuildPrincipalARN(t *testing.T) {
})
}
}

4
weed/s3api/s3api_bucket_policy_engine.go

@ -64,7 +64,7 @@ func (bpe *BucketPolicyEngine) LoadBucketPolicyFromCache(bucket string, policyDo
glog.Errorf("Failed to convert bucket policy for %s: %v", bucket, err)
return fmt.Errorf("failed to convert bucket policy: %w", err)
}
// Marshal the converted policy to JSON for storage in the engine
policyJSON, err := json.Marshal(enginePolicyDoc)
if err != nil {
@ -152,7 +152,7 @@ func (bpe *BucketPolicyEngine) EvaluatePolicyWithContext(bucket, object, action,
// Build resource ARN
resource := buildResourceARN(bucket, object)
glog.V(4).Infof("EvaluatePolicyWithContext: bucket=%s, resource=%s, action=%s (from %s), principal=%s",
glog.V(4).Infof("EvaluatePolicyWithContext: bucket=%s, resource=%s, action=%s (from %s), principal=%s",
bucket, resource, s3Action, action, principal)
// Evaluate using the policy engine

19
weed/s3api/s3api_implicit_directory_test.go

@ -122,10 +122,10 @@ func TestImplicitDirectoryBehaviorLogic(t *testing.T) {
// }
// }
// }
isZeroByteFile := tt.fileSize == 0 && !tt.isDirectory
isActualDirectory := tt.isDirectory
shouldReturn404 := false
if !tt.versioningEnabled && !tt.hasTrailingSlash {
if isZeroByteFile || isActualDirectory {
@ -134,7 +134,7 @@ func TestImplicitDirectoryBehaviorLogic(t *testing.T) {
}
}
}
if shouldReturn404 != tt.shouldReturn404 {
t.Errorf("Logic mismatch for %s:\n Expected shouldReturn404=%v\n Got shouldReturn404=%v\n Description: %s",
tt.name, tt.shouldReturn404, shouldReturn404, tt.description)
@ -180,9 +180,9 @@ func TestHasChildrenLogic(t *testing.T) {
description: "Should return false when no children exist (EOF)",
},
{
name: "Directory with leading slash in prefix",
bucket: "test-bucket",
prefix: "/dataset",
name: "Directory with leading slash in prefix",
bucket: "test-bucket",
prefix: "/dataset",
listResponse: &filer_pb.ListEntriesResponse{
Entry: &filer_pb.Entry{
Name: "file.parquet",
@ -202,14 +202,14 @@ func TestHasChildrenLogic(t *testing.T) {
// 2. It should list with Limit=1
// 3. It should return true if any entry is received
// 4. It should return false if EOF is received
hasChildren := false
if tt.listError == nil && tt.listResponse != nil {
hasChildren = true
} else if tt.listError == io.EOF {
hasChildren = false
}
if hasChildren != tt.expectedResult {
t.Errorf("hasChildren logic mismatch for %s:\n Expected: %v\n Got: %v\n Description: %s",
tt.name, tt.expectedResult, hasChildren, tt.description)
@ -273,7 +273,7 @@ func TestImplicitDirectoryIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
t.Skip("Integration test - run manually with: cd test/s3/parquet && make test-implicit-dir-with-server")
}
@ -283,4 +283,3 @@ func BenchmarkHasChildrenCheck(b *testing.B) {
// Expected: ~1-5ms per call (one gRPC LIST request with Limit=1)
b.Skip("Benchmark - requires full filer setup")
}

2
weed/s3api/s3api_object_handlers_copy.go

@ -513,7 +513,7 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
if endOffset >= startOffset {
partSize = uint64(endOffset - startOffset + 1)
}
dstEntry := &filer_pb.Entry{
Attributes: &filer_pb.FuseAttributes{
FileSize: partSize,

12
weed/s3api/s3api_object_handlers_multipart.go

@ -380,12 +380,12 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
r.Header.Set(s3_constants.SeaweedFSSSEKMSBaseIVHeader, base64.StdEncoding.EncodeToString(baseIV))
} else {
// Check if this upload uses SSE-S3
if err := s3a.handleSSES3MultipartHeaders(r, uploadEntry, uploadID); err != nil {
glog.Errorf("Failed to setup SSE-S3 multipart headers: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// Check if this upload uses SSE-S3
if err := s3a.handleSSES3MultipartHeaders(r, uploadEntry, uploadID); err != nil {
glog.Errorf("Failed to setup SSE-S3 multipart headers: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
}
}
} else if !errors.Is(err, filer_pb.ErrNotFound) {

14
weed/s3api/s3api_object_handlers_put.go

@ -372,7 +372,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
})
if err != nil {
glog.Errorf("putToFiler: chunked upload failed: %v", err)
// CRITICAL: Cleanup orphaned chunks before returning error
// UploadReaderInChunks now returns partial results even on error,
// allowing us to cleanup any chunks that were successfully uploaded
@ -381,7 +381,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
glog.Warningf("putToFiler: Upload failed, attempting to cleanup %d orphaned chunks", len(chunkResult.FileChunks))
s3a.deleteOrphanedChunks(chunkResult.FileChunks)
}
if strings.Contains(err.Error(), s3err.ErrMsgPayloadChecksumMismatch) {
return "", s3err.ErrInvalidDigest, SSEResponseMetadata{}
}
@ -432,7 +432,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
// SSE-KMS: Create per-chunk metadata with chunk-specific offsets
// Each chunk needs its own metadata with ChunkOffset set for proper IV calculation during decryption
chunk.SseType = filer_pb.SSEType_SSE_KMS
// Create a copy of the SSE-KMS key with chunk-specific offset
chunkSSEKey := &SSEKMSKey{
KeyID: sseKMSKey.KeyID,
@ -442,7 +442,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
IV: sseKMSKey.IV,
ChunkOffset: chunk.Offset, // Set chunk-specific offset for IV calculation
}
// Serialize per-chunk metadata
if chunkMetadata, serErr := SerializeSSEKMSMetadata(chunkSSEKey); serErr == nil {
chunk.SseMetadata = chunkMetadata
@ -453,10 +453,10 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
// SSE-S3: Create per-chunk metadata with chunk-specific IVs
// Each chunk needs its own IV calculated from the base IV + chunk offset
chunk.SseType = filer_pb.SSEType_SSE_S3
// Calculate chunk-specific IV using base IV and chunk offset
chunkIV, _ := calculateIVWithOffset(sseS3Key.IV, chunk.Offset)
// Create a copy of the SSE-S3 key with chunk-specific IV
chunkSSEKey := &SSES3Key{
Key: sseS3Key.Key,
@ -464,7 +464,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
Algorithm: sseS3Key.Algorithm,
IV: chunkIV, // Use chunk-specific IV
}
// Serialize per-chunk metadata
if chunkMetadata, serErr := SerializeSSES3Metadata(chunkSSEKey); serErr == nil {
chunk.SseMetadata = chunkMetadata

4
weed/s3api/s3api_server.go

@ -90,7 +90,7 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
// Initialize bucket policy engine first
policyEngine := NewBucketPolicyEngine()
s3ApiServer = &S3ApiServer{
option: option,
iam: iam,
@ -171,7 +171,7 @@ func (s3a *S3ApiServer) syncBucketPolicyToEngine(bucket string, policyDoc *polic
if s3a.policyEngine == nil {
return
}
if policyDoc != nil {
if err := s3a.policyEngine.LoadBucketPolicyFromCache(bucket, policyDoc); err != nil {
glog.Errorf("Failed to sync bucket policy for %s to policy engine: %v", bucket, err)

11
weed/s3api/s3api_sse_chunk_metadata_test.go

@ -36,8 +36,8 @@ func TestSSEKMSChunkMetadataAssignment(t *testing.T) {
// Simulate multi-chunk upload scenario (what putToFiler does after UploadReaderInChunks)
simulatedChunks := []*filer_pb.FileChunk{
{FileId: "chunk1", Offset: 0, Size: 8 * 1024 * 1024}, // 8MB chunk at offset 0
{FileId: "chunk2", Offset: 8 * 1024 * 1024, Size: 8 * 1024 * 1024}, // 8MB chunk at offset 8MB
{FileId: "chunk1", Offset: 0, Size: 8 * 1024 * 1024}, // 8MB chunk at offset 0
{FileId: "chunk2", Offset: 8 * 1024 * 1024, Size: 8 * 1024 * 1024}, // 8MB chunk at offset 8MB
{FileId: "chunk3", Offset: 16 * 1024 * 1024, Size: 4 * 1024 * 1024}, // 4MB chunk at offset 16MB
}
@ -110,7 +110,7 @@ func TestSSEKMSChunkMetadataAssignment(t *testing.T) {
t.Errorf("Chunk %d: KeyID mismatch", i)
}
t.Logf("✓ Chunk %d: metadata deserialized successfully (ChunkOffset=%d, KeyID=%s)",
t.Logf("✓ Chunk %d: metadata deserialized successfully (ChunkOffset=%d, KeyID=%s)",
i, deserializedKey.ChunkOffset, deserializedKey.KeyID)
}
@ -167,8 +167,8 @@ func TestSSES3ChunkMetadataAssignment(t *testing.T) {
// Simulate multi-chunk upload scenario (what putToFiler does after UploadReaderInChunks)
simulatedChunks := []*filer_pb.FileChunk{
{FileId: "chunk1", Offset: 0, Size: 8 * 1024 * 1024}, // 8MB chunk at offset 0
{FileId: "chunk2", Offset: 8 * 1024 * 1024, Size: 8 * 1024 * 1024}, // 8MB chunk at offset 8MB
{FileId: "chunk1", Offset: 0, Size: 8 * 1024 * 1024}, // 8MB chunk at offset 0
{FileId: "chunk2", Offset: 8 * 1024 * 1024, Size: 8 * 1024 * 1024}, // 8MB chunk at offset 8MB
{FileId: "chunk3", Offset: 16 * 1024 * 1024, Size: 4 * 1024 * 1024}, // 4MB chunk at offset 16MB
}
@ -359,4 +359,3 @@ func TestSSEChunkMetadataComparison(t *testing.T) {
}
})
}

19
weed/s3api/s3api_sse_decrypt_test.go

@ -73,12 +73,12 @@ func TestSSECDecryptChunkView_NoOffsetAdjustment(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create decrypted reader (wrong): %v", err)
}
// Skip ivSkip bytes (as the buggy code would do)
if ivSkip > 0 {
io.CopyN(io.Discard, decryptedReaderWrong, int64(ivSkip))
}
decryptedWrong, err := io.ReadAll(decryptedReaderWrong)
if err != nil {
t.Fatalf("Failed to read decrypted data (wrong): %v", err)
@ -107,7 +107,7 @@ func TestSSECDecryptChunkView_NoOffsetAdjustment(t *testing.T) {
func TestSSEKMSDecryptChunkView_RequiresOffsetAdjustment(t *testing.T) {
// Setup: Create test data
plaintext := []byte("This is a test message for SSE-KMS decryption with offset adjustment")
// Generate base IV and key
baseIV := make([]byte, aes.BlockSize)
key := make([]byte, 32)
@ -126,10 +126,10 @@ func TestSSEKMSDecryptChunkView_RequiresOffsetAdjustment(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create cipher: %v", err)
}
ciphertext := make([]byte, len(plaintext))
stream := cipher.NewCTR(block, adjustedIV)
// Skip ivSkip bytes in the encryption stream if needed
if ivSkip > 0 {
dummy := make([]byte, ivSkip)
@ -143,10 +143,10 @@ func TestSSEKMSDecryptChunkView_RequiresOffsetAdjustment(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create cipher for decryption: %v", err)
}
decrypted := make([]byte, len(ciphertext))
streamDecrypt := cipher.NewCTR(blockDecrypt, adjustedIVDecrypt)
// Skip ivSkip bytes in the decryption stream
if ivSkipDecrypt > 0 {
dummy := make([]byte, ivSkipDecrypt)
@ -166,7 +166,7 @@ func TestSSEKMSDecryptChunkView_RequiresOffsetAdjustment(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create cipher for wrong decryption: %v", err)
}
decryptedWrong := make([]byte, len(ciphertext))
streamWrong := cipher.NewCTR(blockWrong, baseIV) // Use base IV directly - WRONG for SSE-KMS
streamWrong.XORKeyStream(decryptedWrong, ciphertext)
@ -184,7 +184,6 @@ func TestSSEDecryptionDifferences(t *testing.T) {
t.Log("SSE-C: Random IV per part → Use stored IV DIRECTLY (no offset)")
t.Log("SSE-KMS: Base IV + offset → MUST call calculateIVWithOffset(baseIV, offset)")
t.Log("SSE-S3: Base IV + offset → Stores ADJUSTED IV, use directly")
// This test documents the critical differences and serves as executable documentation
}

9
weed/s3api/s3api_sse_s3_upload_test.go

@ -88,7 +88,7 @@ func TestSSES3MultipartUploadStoresDerivedIV(t *testing.T) {
t.Fatalf("Failed to create cipher: %v", err)
}
stream := cipher.NewCTR(block, keyWithDerivedIV.IV)
// Handle ivSkip for non-block-aligned offsets
if ivSkip > 0 {
skipDummy := make([]byte, ivSkip)
@ -136,7 +136,7 @@ func TestSSES3MultipartUploadStoresDerivedIV(t *testing.T) {
// returned key contains the derived IV (not base IV).
func TestHandleSSES3MultipartEncryptionFlow(t *testing.T) {
// This test simulates what happens in a real multipart upload request
// Generate test key manually (simulating a complete SSE-S3 key)
keyBytes := make([]byte, 32) // 256-bit key
if _, err := rand.Read(keyBytes); err != nil {
@ -209,9 +209,9 @@ func TestHandleSSES3MultipartEncryptionFlow(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create cipher: %v", err)
}
stream := cipher.NewCTR(block, originalKey.IV)
// Handle ivSkip for non-block-aligned offsets
if ivSkip > 0 {
skipDummy := make([]byte, ivSkip)
@ -255,4 +255,3 @@ func TestSSES3HeaderEncoding(t *testing.T) {
s3_constants.AESBlockSize, len(decodedBaseIV))
}
}

63
weed/util/log_buffer/log_buffer_corruption_test.go

@ -15,32 +15,32 @@ import (
func TestReadTsCorruptedBuffer(t *testing.T) {
// Create a corrupted buffer with invalid protobuf data
buf := make([]byte, 100)
// Set size field to 10 bytes (using proper encoding)
util.Uint32toBytes(buf[0:4], 10)
// Fill with garbage data that won't unmarshal as LogEntry
for i := 4; i < 14; i++ {
buf[i] = 0xFF
}
// Attempt to read timestamp
size, ts, err := readTs(buf, 0)
// Should return an error
if err == nil {
t.Error("Expected error for corrupted buffer, got nil")
}
// Size and ts should be zero on error
if size != 0 {
t.Errorf("Expected size=0 on error, got %d", size)
}
if ts != 0 {
t.Errorf("Expected ts=0 on error, got %d", ts)
}
// Error should indicate corruption
if !errors.Is(err, ErrBufferCorrupted) {
t.Logf("Error message: %v", err)
@ -49,7 +49,7 @@ func TestReadTsCorruptedBuffer(t *testing.T) {
t.Error("Expected non-empty error message")
}
}
t.Logf("✓ readTs correctly returned error for corrupted buffer: %v", err)
}
@ -60,42 +60,42 @@ func TestReadTsValidBuffer(t *testing.T) {
TsNs: 123456789,
Key: []byte("test-key"),
}
// Marshal it
data, err := proto.Marshal(logEntry)
if err != nil {
t.Fatalf("Failed to marshal LogEntry: %v", err)
}
// Create buffer with size prefix using util function
buf := make([]byte, 4+len(data))
util.Uint32toBytes(buf[0:4], uint32(len(data)))
copy(buf[4:], data)
// Read timestamp
size, ts, err := readTs(buf, 0)
// Should succeed
if err != nil {
t.Fatalf("Expected no error for valid buffer, got: %v", err)
}
// Should return correct values
if size != len(data) {
t.Errorf("Expected size=%d, got %d", len(data), size)
}
if ts != logEntry.TsNs {
t.Errorf("Expected ts=%d, got %d", logEntry.TsNs, ts)
}
t.Logf("✓ readTs correctly parsed valid buffer: size=%d, ts=%d", size, ts)
}
// TestReadFromBufferCorruption tests that ReadFromBuffer propagates corruption errors
func TestReadFromBufferCorruption(t *testing.T) {
lb := NewLogBuffer("test-corruption", time.Second, nil, nil, func() {})
// Add a valid entry first using AddDataToBuffer
validKey := []byte("valid")
validData, _ := proto.Marshal(&filer_pb.LogEntry{
@ -105,7 +105,7 @@ func TestReadFromBufferCorruption(t *testing.T) {
if err := lb.AddDataToBuffer(validKey, validData, 1000); err != nil {
t.Fatalf("Failed to add data to buffer: %v", err)
}
// Manually corrupt the buffer by writing garbage
// This simulates a corruption scenario
if len(lb.idx) > 0 {
@ -115,11 +115,11 @@ func TestReadFromBufferCorruption(t *testing.T) {
lb.buf[i] = 0xFF
}
}
// Try to read - should detect corruption
startPos := MessagePosition{Time: lb.startTime}
buf, offset, err := lb.ReadFromBuffer(startPos)
// Should return corruption error
if err == nil {
t.Error("Expected corruption error, got nil")
@ -133,7 +133,7 @@ func TestReadFromBufferCorruption(t *testing.T) {
}
t.Logf("✓ ReadFromBuffer correctly detected corruption: %v", err)
}
t.Logf("ReadFromBuffer result: buf=%v, offset=%d, err=%v", buf != nil, offset, err)
}
@ -144,18 +144,18 @@ func TestLocateByTsCorruption(t *testing.T) {
buf: make([]byte, 100),
size: 14,
}
// Set size field (using proper encoding)
util.Uint32toBytes(mb.buf[0:4], 10)
// Fill with garbage
for i := 4; i < 14; i++ {
mb.buf[i] = 0xFF
}
// Try to locate by timestamp
pos, err := mb.locateByTs(mb.startTime)
// Should return error
if err == nil {
t.Errorf("Expected corruption error, got nil (pos=%d)", pos)
@ -170,17 +170,17 @@ func TestErrorPropagationChain(t *testing.T) {
// Already covered by TestReadTsCorruptedBuffer
t.Log("✓ readTs error propagation tested")
})
t.Run("Corruption in locateByTs", func(t *testing.T) {
// Already covered by TestLocateByTsCorruption
// Already covered by TestLocateByTsCorruption
t.Log("✓ locateByTs error propagation tested")
})
t.Run("Corruption in ReadFromBuffer binary search", func(t *testing.T) {
// Already covered by TestReadFromBufferCorruption
t.Log("✓ ReadFromBuffer error propagation tested")
})
t.Log("✓ Complete error propagation chain verified")
}
@ -203,18 +203,18 @@ func TestNoSilentCorruption(t *testing.T) {
pos: 0,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
size, ts, err := readTs(tc.buf, tc.pos)
// CRITICAL: Must return error, never silent (0, 0)
if err == nil {
t.Errorf("CRITICAL: readTs returned (%d, %d, nil) for corrupted buffer - this causes silent data corruption!", size, ts)
} else {
t.Logf("✓ Correctly returned error instead of silent (0, 0): %v", err)
}
// On error, size and ts should be 0
if size != 0 || ts != 0 {
t.Errorf("On error, expected (0, 0), got (%d, %d)", size, ts)
@ -222,4 +222,3 @@ func TestNoSilentCorruption(t *testing.T) {
})
}
}

20
weed/util/log_buffer/log_buffer_test.go

@ -141,16 +141,16 @@ func TestReadFromBuffer_OldOffsetReturnsResumeFromDiskError(t *testing.T) {
// Add some data to the buffer if needed (at current offset position)
if tt.hasData {
testData := []byte("test message")
// Use AddLogEntryToBuffer to preserve offset information
if err := lb.AddLogEntryToBuffer(&filer_pb.LogEntry{
TsNs: time.Now().UnixNano(),
Key: []byte("key"),
Data: testData,
Offset: tt.currentOffset, // Add data at current offset
}); err != nil {
t.Fatalf("Failed to add log entry: %v", err)
}
testData := []byte("test message")
// Use AddLogEntryToBuffer to preserve offset information
if err := lb.AddLogEntryToBuffer(&filer_pb.LogEntry{
TsNs: time.Now().UnixNano(),
Key: []byte("key"),
Data: testData,
Offset: tt.currentOffset, // Add data at current offset
}); err != nil {
t.Fatalf("Failed to add log entry: %v", err)
}
}
// Create an offset-based position for the requested offset

4
weed/util/log_buffer/log_read.go

@ -223,13 +223,13 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, star
}
bytesBuf, offset, err = logBuffer.ReadFromBuffer(lastReadPosition)
glog.V(4).Infof("ReadFromBuffer for %s returned bytesBuf=%v, offset=%d, err=%v", readerName, bytesBuf != nil, offset, err)
// Check for buffer corruption error before other error handling
if err != nil && errors.Is(err, ErrBufferCorrupted) {
glog.Errorf("%s: Buffer corruption detected: %v", readerName, err)
return lastReadPosition, true, fmt.Errorf("buffer corruption: %w", err)
}
if err == ResumeFromDiskError {
// Try to read from disk if readFromDiskFn is available
if logBuffer.ReadFromDiskFn != nil {

Loading…
Cancel
Save