Browse Source

add unit tests

pull/7519/head
chrislu 2 weeks ago
parent
commit
53dfc43d2b
  1. 467
      weed/s3api/s3api_bucket_handlers_test.go

467
weed/s3api/s3api_bucket_handlers_test.go

@ -3,7 +3,9 @@ package s3api
import (
"encoding/json"
"encoding/xml"
"fmt"
"net/http/httptest"
"strings"
"testing"
"time"
@ -248,3 +250,468 @@ func TestListAllMyBucketsResultNamespace(t *testing.T) {
t.Logf("Generated XML:\n%s", xmlString)
}
// TestListBucketsOwnershipFiltering tests that ListBucketsHandler properly filters
// buckets based on ownership, allowing only bucket owners (or admins) to see their buckets
func TestListBucketsOwnershipFiltering(t *testing.T) {
testCases := []struct {
name string
buckets []testBucket
requestIdentityId string
requestIsAdmin bool
expectedBucketNames []string
description string
}{
{
name: "non-admin sees only owned buckets",
buckets: []testBucket{
{name: "user1-bucket", ownerId: "user1"},
{name: "user2-bucket", ownerId: "user2"},
{name: "user1-bucket2", ownerId: "user1"},
},
requestIdentityId: "user1",
requestIsAdmin: false,
expectedBucketNames: []string{"user1-bucket", "user1-bucket2"},
description: "Non-admin user should only see buckets they own",
},
{
name: "admin sees all buckets",
buckets: []testBucket{
{name: "user1-bucket", ownerId: "user1"},
{name: "user2-bucket", ownerId: "user2"},
{name: "user3-bucket", ownerId: "user3"},
},
requestIdentityId: "admin",
requestIsAdmin: true,
expectedBucketNames: []string{"user1-bucket", "user2-bucket", "user3-bucket"},
description: "Admin should see all buckets regardless of owner",
},
{
name: "buckets without owner are visible to all non-admins",
buckets: []testBucket{
{name: "owned-bucket", ownerId: "user1"},
{name: "unowned-bucket", ownerId: ""}, // No owner set
},
requestIdentityId: "user2",
requestIsAdmin: false,
expectedBucketNames: []string{"unowned-bucket"},
description: "Buckets without owner should be visible to all users (backward compatibility)",
},
{
name: "empty identityId allows only unowned buckets",
buckets: []testBucket{
{name: "owned-bucket", ownerId: "user1"},
{name: "unowned-bucket", ownerId: ""},
},
requestIdentityId: "",
requestIsAdmin: false,
expectedBucketNames: []string{"owned-bucket", "unowned-bucket"},
description: "When identityId is empty, ownership check is skipped",
},
{
name: "admin with empty identityId sees all",
buckets: []testBucket{
{name: "user1-bucket", ownerId: "user1"},
{name: "user2-bucket", ownerId: "user2"},
},
requestIdentityId: "",
requestIsAdmin: true,
expectedBucketNames: []string{"user1-bucket", "user2-bucket"},
description: "Admin should see all buckets even with empty identityId",
},
{
name: "buckets with nil Extended metadata",
buckets: []testBucket{
{name: "bucket-no-extended", ownerId: "", nilExtended: true},
{name: "bucket-with-owner", ownerId: "user1"},
},
requestIdentityId: "user1",
requestIsAdmin: false,
expectedBucketNames: []string{"bucket-no-extended", "bucket-with-owner"},
description: "Buckets with nil Extended should be treated as unowned",
},
{
name: "user sees only their bucket among many",
buckets: []testBucket{
{name: "alice-bucket", ownerId: "alice"},
{name: "bob-bucket", ownerId: "bob"},
{name: "charlie-bucket", ownerId: "charlie"},
{name: "alice-bucket2", ownerId: "alice"},
},
requestIdentityId: "bob",
requestIsAdmin: false,
expectedBucketNames: []string{"bob-bucket"},
description: "User should see only their single bucket among many",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Create mock entries
entries := make([]*filer_pb.Entry, 0, len(tc.buckets))
for _, bucket := range tc.buckets {
entry := &filer_pb.Entry{
Name: bucket.name,
IsDirectory: true,
Attributes: &filer_pb.FuseAttributes{
Crtime: time.Now().Unix(),
},
}
if !bucket.nilExtended {
entry.Extended = make(map[string][]byte)
if bucket.ownerId != "" {
entry.Extended[s3_constants.AmzIdentityId] = []byte(bucket.ownerId)
}
}
entries = append(entries, entry)
}
// Filter entries using the same logic as ListBucketsHandler
var filteredBuckets []string
for _, entry := range entries {
if entry.IsDirectory {
// Apply ownership filtering logic (lines 67-74 from s3api_bucket_handlers.go)
shouldSkip := false
if tc.requestIdentityId != "" && !tc.requestIsAdmin {
if id, ok := entry.Extended[s3_constants.AmzIdentityId]; ok {
if bucketOwnerId := string(id); bucketOwnerId != "" && bucketOwnerId != tc.requestIdentityId {
shouldSkip = true
}
}
}
if !shouldSkip {
filteredBuckets = append(filteredBuckets, entry.Name)
}
}
}
// Assert expected buckets match filtered buckets
assert.ElementsMatch(t, tc.expectedBucketNames, filteredBuckets,
"%s - Expected buckets: %v, Got: %v", tc.description, tc.expectedBucketNames, filteredBuckets)
})
}
}
// testBucket represents a bucket for testing with ownership metadata
type testBucket struct {
name string
ownerId string
nilExtended bool
}
// TestListBucketsOwnershipEdgeCases tests edge cases in ownership filtering
func TestListBucketsOwnershipEdgeCases(t *testing.T) {
t.Run("malformed owner id with special characters", func(t *testing.T) {
entry := &filer_pb.Entry{
Name: "test-bucket",
IsDirectory: true,
Extended: map[string][]byte{
s3_constants.AmzIdentityId: []byte("user@domain.com"),
},
Attributes: &filer_pb.FuseAttributes{
Crtime: time.Now().Unix(),
},
}
requestIdentityId := "user@domain.com"
isAdmin := false
// Should match exactly even with special characters
shouldSkip := false
if requestIdentityId != "" && !isAdmin {
if id, ok := entry.Extended[s3_constants.AmzIdentityId]; ok {
if bucketOwnerId := string(id); bucketOwnerId != "" && bucketOwnerId != requestIdentityId {
shouldSkip = true
}
}
}
assert.False(t, shouldSkip, "Should match owner ID with special characters exactly")
})
t.Run("owner id with unicode characters", func(t *testing.T) {
unicodeOwnerId := "用户123"
entry := &filer_pb.Entry{
Name: "test-bucket",
IsDirectory: true,
Extended: map[string][]byte{
s3_constants.AmzIdentityId: []byte(unicodeOwnerId),
},
Attributes: &filer_pb.FuseAttributes{
Crtime: time.Now().Unix(),
},
}
requestIdentityId := unicodeOwnerId
isAdmin := false
shouldSkip := false
if requestIdentityId != "" && !isAdmin {
if id, ok := entry.Extended[s3_constants.AmzIdentityId]; ok {
if bucketOwnerId := string(id); bucketOwnerId != "" && bucketOwnerId != requestIdentityId {
shouldSkip = true
}
}
}
assert.False(t, shouldSkip, "Should handle unicode owner IDs correctly")
})
t.Run("owner id with binary data", func(t *testing.T) {
entry := &filer_pb.Entry{
Name: "test-bucket",
IsDirectory: true,
Extended: map[string][]byte{
s3_constants.AmzIdentityId: []byte{0x00, 0x01, 0x02, 0xFF},
},
Attributes: &filer_pb.FuseAttributes{
Crtime: time.Now().Unix(),
},
}
requestIdentityId := "normaluser"
isAdmin := false
// Should not panic when converting binary data to string
assert.NotPanics(t, func() {
shouldSkip := false
if requestIdentityId != "" && !isAdmin {
if id, ok := entry.Extended[s3_constants.AmzIdentityId]; ok {
if bucketOwnerId := string(id); bucketOwnerId != "" && bucketOwnerId != requestIdentityId {
shouldSkip = true
}
}
}
assert.True(t, shouldSkip, "Binary owner ID should not match normal user")
})
})
t.Run("empty owner id in Extended", func(t *testing.T) {
entry := &filer_pb.Entry{
Name: "test-bucket",
IsDirectory: true,
Extended: map[string][]byte{
s3_constants.AmzIdentityId: []byte(""),
},
Attributes: &filer_pb.FuseAttributes{
Crtime: time.Now().Unix(),
},
}
requestIdentityId := "user1"
isAdmin := false
shouldSkip := false
if requestIdentityId != "" && !isAdmin {
if id, ok := entry.Extended[s3_constants.AmzIdentityId]; ok {
if bucketOwnerId := string(id); bucketOwnerId != "" && bucketOwnerId != requestIdentityId {
shouldSkip = true
}
}
}
assert.False(t, shouldSkip, "Empty owner ID should be treated as unowned (visible to all)")
})
t.Run("nil Extended map safe access", func(t *testing.T) {
entry := &filer_pb.Entry{
Name: "test-bucket",
IsDirectory: true,
Extended: nil, // Explicitly nil
Attributes: &filer_pb.FuseAttributes{
Crtime: time.Now().Unix(),
},
}
requestIdentityId := "user1"
isAdmin := false
// Should not panic with nil Extended map
assert.NotPanics(t, func() {
shouldSkip := false
if requestIdentityId != "" && !isAdmin {
if id, ok := entry.Extended[s3_constants.AmzIdentityId]; ok {
if bucketOwnerId := string(id); bucketOwnerId != "" && bucketOwnerId != requestIdentityId {
shouldSkip = true
}
}
}
assert.False(t, shouldSkip, "Nil Extended should be treated as unowned")
})
})
t.Run("very long owner id", func(t *testing.T) {
longOwnerId := strings.Repeat("a", 10000)
entry := &filer_pb.Entry{
Name: "test-bucket",
IsDirectory: true,
Extended: map[string][]byte{
s3_constants.AmzIdentityId: []byte(longOwnerId),
},
Attributes: &filer_pb.FuseAttributes{
Crtime: time.Now().Unix(),
},
}
requestIdentityId := longOwnerId
isAdmin := false
// Should handle very long owner IDs without panic
assert.NotPanics(t, func() {
shouldSkip := false
if requestIdentityId != "" && !isAdmin {
if id, ok := entry.Extended[s3_constants.AmzIdentityId]; ok {
if bucketOwnerId := string(id); bucketOwnerId != "" && bucketOwnerId != requestIdentityId {
shouldSkip = true
}
}
}
assert.False(t, shouldSkip, "Long owner ID should match correctly")
})
})
}
// TestListBucketsOwnershipWithPermissions tests that ownership filtering
// works in conjunction with permission checks
func TestListBucketsOwnershipWithPermissions(t *testing.T) {
t.Run("ownership check before permission check", func(t *testing.T) {
// Simulate scenario where ownership check filters first,
// then permission check applies to remaining buckets
entries := []*filer_pb.Entry{
{
Name: "owned-bucket",
IsDirectory: true,
Extended: map[string][]byte{
s3_constants.AmzIdentityId: []byte("user1"),
},
Attributes: &filer_pb.FuseAttributes{Crtime: time.Now().Unix()},
},
{
Name: "other-bucket",
IsDirectory: true,
Extended: map[string][]byte{
s3_constants.AmzIdentityId: []byte("user2"),
},
Attributes: &filer_pb.FuseAttributes{Crtime: time.Now().Unix()},
},
}
requestIdentityId := "user1"
isAdmin := false
// First pass: ownership filtering
var afterOwnershipFilter []*filer_pb.Entry
for _, entry := range entries {
shouldSkip := false
if requestIdentityId != "" && !isAdmin {
if id, ok := entry.Extended[s3_constants.AmzIdentityId]; ok {
if bucketOwnerId := string(id); bucketOwnerId != "" && bucketOwnerId != requestIdentityId {
shouldSkip = true
}
}
}
if !shouldSkip {
afterOwnershipFilter = append(afterOwnershipFilter, entry)
}
}
// Only owned-bucket should remain after ownership filter
assert.Len(t, afterOwnershipFilter, 1, "Only owned bucket should pass ownership filter")
assert.Equal(t, "owned-bucket", afterOwnershipFilter[0].Name)
// Permission checks would apply to afterOwnershipFilter entries
// (not tested here as it depends on IAM system)
})
t.Run("admin bypasses ownership but not permissions", func(t *testing.T) {
entries := []*filer_pb.Entry{
{
Name: "user1-bucket",
IsDirectory: true,
Extended: map[string][]byte{
s3_constants.AmzIdentityId: []byte("user1"),
},
Attributes: &filer_pb.FuseAttributes{Crtime: time.Now().Unix()},
},
{
Name: "user2-bucket",
IsDirectory: true,
Extended: map[string][]byte{
s3_constants.AmzIdentityId: []byte("user2"),
},
Attributes: &filer_pb.FuseAttributes{Crtime: time.Now().Unix()},
},
}
requestIdentityId := "admin-user"
isAdmin := true
// Admin bypasses ownership check
var afterOwnershipFilter []*filer_pb.Entry
for _, entry := range entries {
shouldSkip := false
if requestIdentityId != "" && !isAdmin {
if id, ok := entry.Extended[s3_constants.AmzIdentityId]; ok {
if bucketOwnerId := string(id); bucketOwnerId != "" && bucketOwnerId != requestIdentityId {
shouldSkip = true
}
}
}
if !shouldSkip {
afterOwnershipFilter = append(afterOwnershipFilter, entry)
}
}
// Admin should see all buckets after ownership filter
assert.Len(t, afterOwnershipFilter, 2, "Admin should see all buckets after ownership filter")
// Note: Permission checks still apply to admins in actual implementation
})
}
// TestListBucketsOwnershipCaseSensitivity tests case sensitivity in owner matching
func TestListBucketsOwnershipCaseSensitivity(t *testing.T) {
entry := &filer_pb.Entry{
Name: "test-bucket",
IsDirectory: true,
Extended: map[string][]byte{
s3_constants.AmzIdentityId: []byte("User1"),
},
Attributes: &filer_pb.FuseAttributes{
Crtime: time.Now().Unix(),
},
}
testCases := []struct {
requestIdentityId string
shouldMatch bool
}{
{"User1", true},
{"user1", false}, // Case sensitive
{"USER1", false}, // Case sensitive
{"User2", false},
}
for _, tc := range testCases {
t.Run(fmt.Sprintf("identity_%s", tc.requestIdentityId), func(t *testing.T) {
isAdmin := false
shouldSkip := false
if tc.requestIdentityId != "" && !isAdmin {
if id, ok := entry.Extended[s3_constants.AmzIdentityId]; ok {
if bucketOwnerId := string(id); bucketOwnerId != "" && bucketOwnerId != tc.requestIdentityId {
shouldSkip = true
}
}
}
if tc.shouldMatch {
assert.False(t, shouldSkip, "Identity %s should match (case sensitive)", tc.requestIdentityId)
} else {
assert.True(t, shouldSkip, "Identity %s should not match (case sensitive)", tc.requestIdentityId)
}
})
}
}
Loading…
Cancel
Save