diff --git a/test/s3tables/catalog_trino/trino_blog_operations_test.go b/test/s3tables/catalog_trino/trino_blog_operations_test.go new file mode 100644 index 000000000..6603507d5 --- /dev/null +++ b/test/s3tables/catalog_trino/trino_blog_operations_test.go @@ -0,0 +1,172 @@ +package catalog_trino + +import ( + "fmt" + "strconv" + "strings" + "testing" + "time" +) + +func TestTrinoBlogOperations(t *testing.T) { + env := setupTrinoTest(t) + defer env.Cleanup(t) + + schemaName := "blog_ns_" + randomString(6) + customersTable := "customers_" + randomString(6) + trinoCustomersTable := "trino_customers_" + randomString(6) + + runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS iceberg.%s", schemaName)) + defer runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("DROP SCHEMA IF EXISTS iceberg.%s", schemaName)) + defer runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("DROP TABLE IF EXISTS iceberg.%s.%s", schemaName, trinoCustomersTable)) + defer runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("DROP TABLE IF EXISTS iceberg.%s.%s", schemaName, customersTable)) + + createCustomersSQL := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS iceberg.%s.%s ( + customer_sk INT, + customer_id VARCHAR, + salutation VARCHAR, + first_name VARCHAR, + last_name VARCHAR, + preferred_cust_flag VARCHAR, + birth_day INT, + birth_month INT, + birth_year INT, + birth_country VARCHAR, + login VARCHAR +) WITH ( + format = 'PARQUET', + sorted_by = ARRAY['customer_id'] +)`, schemaName, customersTable) + runTrinoSQL(t, env.trinoContainer, createCustomersSQL) + + insertCustomersSQL := fmt.Sprintf(`INSERT INTO iceberg.%s.%s VALUES + (1, 'AAAAA', 'Mrs', 'Amanda', 'Olson', 'Y', 8, 4, 1984, 'US', 'aolson'), + (2, 'AAAAB', 'Mr', 'Leonard', 'Eads', 'N', 22, 6, 2001, 'US', 'leads'), + (3, 'BAAAA', 'Mr', 'David', 'White', 'Y', 16, 2, 1999, 'US', 'dwhite'), + (4, 'BBAAA', 'Mr', 'Melvin', 'Lee', 'N', 30, 3, 1973, 'US', 'mlee'), + (5, 'AACAA', 'Mr', 'Donald', 'Holt', 'N', 2, 6, 1982, 'CA', 'dholt'), + (6, 'ABAAA', 'Mrs', 'Jacqueline', 'Harvey', 'N', 5, 12, 1988, 'US', 'jharvey'), + (7, 'BBAAA', 'Ms', 'Debbie', 'Ward', 'N', 6, 1, 2006, 'MX', 'dward'), + (8, 'ACAAA', 'Mr', 'Tim', 'Strong', 'N', 15, 7, 1976, 'US', 'tstrong') +`, schemaName, customersTable) + runTrinoSQL(t, env.trinoContainer, insertCustomersSQL) + + countOutput := runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("SELECT count(*) FROM iceberg.%s.%s", schemaName, customersTable)) + rowCount := mustParseCSVInt64(t, countOutput) + if rowCount != 8 { + t.Fatalf("expected 8 rows in customers table, got %d", rowCount) + } + + output := runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("SELECT first_name FROM iceberg.%s.%s WHERE customer_sk = 1", schemaName, customersTable)) + if !strings.Contains(output, "Amanda") { + t.Fatalf("expected sample query to include Amanda, got: %s", output) + } + + ctasSQL := fmt.Sprintf(`CREATE TABLE iceberg.%s.%s +WITH ( + format = 'PARQUET' +) +AS SELECT * FROM iceberg.%s.%s`, schemaName, trinoCustomersTable, schemaName, customersTable) + runTrinoSQL(t, env.trinoContainer, ctasSQL) + + countOutput = runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("SELECT count(*) FROM iceberg.%s.%s", schemaName, trinoCustomersTable)) + rowCount = mustParseCSVInt64(t, countOutput) + if rowCount != 8 { + t.Fatalf("expected 8 rows in CTAS table, got %d", rowCount) + } + + runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("ALTER TABLE iceberg.%s.%s ADD COLUMN updated_at TIMESTAMP", schemaName, trinoCustomersTable)) + output = runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("DESCRIBE iceberg.%s.%s", schemaName, trinoCustomersTable)) + if !strings.Contains(output, "updated_at") { + t.Fatalf("expected updated_at column in describe output, got: %s", output) + } + + runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("UPDATE iceberg.%s.%s SET updated_at = current_timestamp", schemaName, trinoCustomersTable)) + + // Sleep to ensure timestamps are in the past for time travel queries + time.Sleep(1 * time.Second) + + snapshotOutput := runTrinoSQL(t, env.trinoContainer, fmt.Sprintf(`SELECT snapshot_id FROM iceberg.%s."%s$snapshots" ORDER BY committed_at DESC LIMIT 1`, schemaName, trinoCustomersTable)) + snapshotID := mustParseCSVInt64(t, snapshotOutput) + if snapshotID == 0 { + t.Fatalf("expected snapshot ID from snapshots table, got 0") + } + + filesOutput := runTrinoSQL(t, env.trinoContainer, fmt.Sprintf(`SELECT file_path FROM iceberg.%s."%s$files" LIMIT 1`, schemaName, trinoCustomersTable)) + if !hasCSVDataRow(filesOutput) { + t.Fatalf("expected files metadata rows, got: %s", filesOutput) + } + + historyOutput := runTrinoSQL(t, env.trinoContainer, fmt.Sprintf(`SELECT made_current_at FROM iceberg.%s."%s$history" LIMIT 1`, schemaName, trinoCustomersTable)) + if !hasCSVDataRow(historyOutput) { + t.Fatalf("expected history metadata rows, got: %s", historyOutput) + } + + countOutput = runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("SELECT count(*) FROM iceberg.%s.%s FOR VERSION AS OF %d", schemaName, trinoCustomersTable, snapshotID)) + versionCount := mustParseCSVInt64(t, countOutput) + if versionCount != 8 { + t.Fatalf("expected 8 rows for version time travel, got %d", versionCount) + } + + // Use current_timestamp - interval '1 second' to ensure it's in the past (Iceberg requirement) + countOutput = runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("SELECT count(*) FROM iceberg.%s.%s FOR TIMESTAMP AS OF (current_timestamp - interval '1' second)", schemaName, trinoCustomersTable)) + timestampCount := mustParseCSVInt64(t, countOutput) + if timestampCount != 8 { + t.Fatalf("expected 8 rows for timestamp time travel, got %d", timestampCount) + } + + runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("DELETE FROM iceberg.%s.%s WHERE customer_sk = 8", schemaName, trinoCustomersTable)) + countOutput = runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("SELECT count(*) FROM iceberg.%s.%s", schemaName, trinoCustomersTable)) + rowCount = mustParseCSVInt64(t, countOutput) + if rowCount != 7 { + t.Fatalf("expected 7 rows after delete, got %d", rowCount) + } + + runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("ALTER TABLE iceberg.%s.%s EXECUTE rollback_to_snapshot(%d)", schemaName, trinoCustomersTable, snapshotID)) + countOutput = runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("SELECT count(*) FROM iceberg.%s.%s", schemaName, trinoCustomersTable)) + rowCount = mustParseCSVInt64(t, countOutput) + if rowCount != 8 { + t.Fatalf("expected 8 rows after rollback, got %d", rowCount) + } +} + +func hasCSVDataRow(output string) bool { + lines := strings.Split(strings.TrimSpace(output), "\n") + if len(lines) == 0 { + return false + } + for _, line := range lines { + if strings.TrimSpace(line) != "" { + return true + } + } + return false +} + +func mustParseCSVInt64(t *testing.T, output string) int64 { + t.Helper() + value := mustFirstCSVValue(t, output) + parsed, err := strconv.ParseInt(value, 10, 64) + if err != nil { + t.Fatalf("failed to parse int from output %q: %v", output, err) + } + return parsed +} + +func mustFirstCSVValue(t *testing.T, output string) string { + t.Helper() + lines := strings.Split(strings.TrimSpace(output), "\n") + if len(lines) == 0 { + t.Fatalf("expected CSV output with data row, got: %q", output) + } + for _, line := range lines { + line = strings.TrimSpace(line) + if line == "" { + continue + } + parts := strings.Split(line, ",") + return strings.Trim(parts[0], "\"") + } + t.Fatalf("no CSV data rows found in output: %q", output) + return "" +} diff --git a/test/s3tables/catalog_trino/trino_catalog_test.go b/test/s3tables/catalog_trino/trino_catalog_test.go index 74dec1637..44beea80c 100644 --- a/test/s3tables/catalog_trino/trino_catalog_test.go +++ b/test/s3tables/catalog_trino/trino_catalog_test.go @@ -319,8 +319,9 @@ func (env *TestEnvironment) writeTrinoConfig(t *testing.T, warehouseBucket strin config := fmt.Sprintf(`connector.name=iceberg iceberg.catalog.type=rest iceberg.rest-catalog.uri=http://host.docker.internal:%d -iceberg.rest-catalog.warehouse=s3://%s/ +iceberg.rest-catalog.warehouse=s3tablescatalog/%s iceberg.file-format=PARQUET +iceberg.unique-table-location=true # S3 storage config fs.native-s3.enabled=true @@ -415,7 +416,7 @@ func runTrinoSQL(t *testing.T, containerName, sql string) string { logs, _ := exec.Command("docker", "logs", containerName).CombinedOutput() t.Fatalf("Trino command failed: %v\nSQL: %s\nOutput:\n%s\nTrino logs:\n%s", err, sql, string(output), string(logs)) } - return string(output) + return sanitizeTrinoOutput(string(output)) } func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) { @@ -439,6 +440,30 @@ func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) { t.Logf("Created table bucket: %s", bucketName) } +func sanitizeTrinoOutput(output string) string { + lines := strings.Split(strings.TrimSpace(output), "\n") + filtered := make([]string, 0, len(lines)) + for _, line := range lines { + if strings.Contains(line, "org.jline.utils.Log") { + continue + } + if strings.Contains(line, "Unable to create a system terminal") { + continue + } + if strings.HasPrefix(line, "WARNING:") { + continue + } + if strings.TrimSpace(line) == "" { + continue + } + filtered = append(filtered, line) + } + if len(filtered) == 0 { + return "" + } + return strings.Join(filtered, "\n") + "\n" +} + func createObjectBucket(t *testing.T, env *TestEnvironment, bucketName string) { t.Helper() diff --git a/weed/admin/dash/file_browser_data.go b/weed/admin/dash/file_browser_data.go index 4126b2ac8..f5b74f84b 100644 --- a/weed/admin/dash/file_browser_data.go +++ b/weed/admin/dash/file_browser_data.go @@ -6,7 +6,9 @@ import ( "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3tables" ) // FileEntry represents a file or directory entry in the file browser @@ -218,25 +220,33 @@ func (s *AdminServer) GetFileBrowser(dir string, lastFileName string, pageSize i } } - // Check if this is a bucket path + // Check if this is a bucket path and determine if it's a table bucket isBucketPath := false bucketName := "" + isTableBucketPath := false + tableBucketName := "" if strings.HasPrefix(dir, "/buckets/") { isBucketPath = true pathParts := strings.Split(strings.Trim(dir, "/"), "/") if len(pathParts) >= 2 { bucketName = pathParts[1] - } - } - - // Check if this is a table bucket path - isTableBucketPath := false - tableBucketName := "" - if strings.HasPrefix(dir, "/table-buckets/") { - isTableBucketPath = true - pathParts := strings.Split(strings.Trim(dir, "/"), "/") - if len(pathParts) >= 2 { - tableBucketName = pathParts[1] + // Check table bucket status early to avoid second WithFilerClient call + if err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + resp, err := filer_pb.LookupEntry(context.Background(), client, &filer_pb.LookupDirectoryEntryRequest{ + Directory: "/buckets", + Name: bucketName, + }) + if err != nil { + return err + } + if s3tables.IsTableBucketEntry(resp.Entry) { + isTableBucketPath = true + tableBucketName = bucketName + } + return nil + }); err != nil { + glog.V(1).Infof("file browser table bucket lookup failed for %s: %v", bucketName, err) + } } } @@ -287,12 +297,8 @@ func (s *AdminServer) generateBreadcrumbs(dir string) []BreadcrumbItem { displayName := part if len(breadcrumbs) == 1 && part == "buckets" { displayName = "Object Store Buckets" - } else if len(breadcrumbs) == 1 && part == "table-buckets" { - displayName = "Table Buckets" } else if len(breadcrumbs) == 2 && strings.HasPrefix(dir, "/buckets/") { displayName = "📦 " + part // Add bucket icon to bucket name - } else if len(breadcrumbs) == 2 && strings.HasPrefix(dir, "/table-buckets/") { - displayName = "🧊 " + part } breadcrumbs = append(breadcrumbs, BreadcrumbItem{ diff --git a/weed/admin/dash/file_browser_data_test.go b/weed/admin/dash/file_browser_data_test.go index e02465034..0605735af 100644 --- a/weed/admin/dash/file_browser_data_test.go +++ b/weed/admin/dash/file_browser_data_test.go @@ -51,15 +51,6 @@ func TestGenerateBreadcrumbs(t *testing.T) { {Name: "📦 mybucket", Path: "/buckets/mybucket"}, }, }, - { - name: "table bucket path", - path: "/table-buckets/mytablebucket", - expected: []BreadcrumbItem{ - {Name: "Root", Path: "/"}, - {Name: "Table Buckets", Path: "/table-buckets"}, - {Name: "🧊 mytablebucket", Path: "/table-buckets/mytablebucket"}, - }, - }, { name: "bucket nested path", path: "/buckets/mybucket/folder", @@ -70,16 +61,6 @@ func TestGenerateBreadcrumbs(t *testing.T) { {Name: "folder", Path: "/buckets/mybucket/folder"}, }, }, - { - name: "table bucket nested path", - path: "/table-buckets/mytablebucket/folder", - expected: []BreadcrumbItem{ - {Name: "Root", Path: "/"}, - {Name: "Table Buckets", Path: "/table-buckets"}, - {Name: "🧊 mytablebucket", Path: "/table-buckets/mytablebucket"}, - {Name: "folder", Path: "/table-buckets/mytablebucket/folder"}, - }, - }, { name: "path with trailing slash", path: "/folder/", @@ -195,11 +176,6 @@ func TestParentPathCalculationLogic(t *testing.T) { currentDir: "/buckets/mybucket", expected: "/buckets", }, - { - name: "table bucket directory", - currentDir: "/table-buckets/mytablebucket", - expected: "/table-buckets", - }, } for _, tt := range tests { diff --git a/weed/admin/dash/s3tables_management.go b/weed/admin/dash/s3tables_management.go index fd9e4334b..a77a8201c 100644 --- a/weed/admin/dash/s3tables_management.go +++ b/weed/admin/dash/s3tables_management.go @@ -100,6 +100,9 @@ func (s *AdminServer) GetS3TablesBucketsData(ctx context.Context) (S3TablesBucke if strings.HasPrefix(entry.Entry.Name, ".") { continue } + if !s3tables.IsTableBucketEntry(entry.Entry) { + continue + } metaBytes, ok := entry.Entry.Extended[s3tables.ExtendedKeyMetadata] if !ok { continue diff --git a/weed/admin/view/app/file_browser.templ b/weed/admin/view/app/file_browser.templ index 58c999ebc..576a27ed9 100644 --- a/weed/admin/view/app/file_browser.templ +++ b/weed/admin/view/app/file_browser.templ @@ -14,25 +14,25 @@ script changePageSize(path string, lastFileName string) { templ FileBrowser(data dash.FileBrowserData) {

- if data.IsBucketPath && data.BucketName != "" { - S3 Bucket: {data.BucketName} - } else if data.IsTableBucketPath && data.TableBucketName != "" { - Table Bucket: {data.TableBucketName} - } else { - File Browser - } + if data.IsTableBucketPath && data.TableBucketName != "" { + Table Bucket: {data.TableBucketName} + } else if data.IsBucketPath && data.BucketName != "" { + S3 Bucket: {data.BucketName} + } else { + File Browser + }

- if data.IsBucketPath && data.BucketName != "" { - - Back to Buckets - - } else if data.IsTableBucketPath && data.TableBucketName != "" { - - Back to Table Buckets - - } + if data.IsTableBucketPath && data.TableBucketName != "" { + + Back to Table Buckets + + } else if data.IsBucketPath && data.BucketName != "" { + + Back to Buckets + + } @@ -85,12 +85,7 @@ templ FileBrowser(data dash.FileBrowserData) { Manage Buckets - } else if data.CurrentPath == "/table-buckets" { - Table Buckets Directory - - Manage Table Buckets - - } else { + } else { { filepath.Base(data.CurrentPath) } } diff --git a/weed/admin/view/app/file_browser_templ.go b/weed/admin/view/app/file_browser_templ.go index 8e8793e99..230471fc1 100644 --- a/weed/admin/view/app/file_browser_templ.go +++ b/weed/admin/view/app/file_browser_templ.go @@ -50,29 +50,29 @@ func FileBrowser(data dash.FileBrowserData) templ.Component { if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } - if data.IsBucketPath && data.BucketName != "" { - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 2, "S3 Bucket: ") + if data.IsTableBucketPath && data.TableBucketName != "" { + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 2, "Table Bucket: ") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } var templ_7745c5c3_Var2 string - templ_7745c5c3_Var2, templ_7745c5c3_Err = templ.JoinStringErrs(data.BucketName) + templ_7745c5c3_Var2, templ_7745c5c3_Err = templ.JoinStringErrs(data.TableBucketName) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/app/file_browser.templ`, Line: 18, Col: 63} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/app/file_browser.templ`, Line: 18, Col: 72} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var2)) if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } - } else if data.IsTableBucketPath && data.TableBucketName != "" { - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 3, "Table Bucket: ") + } else if data.IsBucketPath && data.BucketName != "" { + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 3, "S3 Bucket: ") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } var templ_7745c5c3_Var3 string - templ_7745c5c3_Var3, templ_7745c5c3_Err = templ.JoinStringErrs(data.TableBucketName) + templ_7745c5c3_Var3, templ_7745c5c3_Err = templ.JoinStringErrs(data.BucketName) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/app/file_browser.templ`, Line: 20, Col: 72} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/app/file_browser.templ`, Line: 20, Col: 63} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var3)) if templ_7745c5c3_Err != nil { @@ -88,13 +88,13 @@ func FileBrowser(data dash.FileBrowserData) templ.Component { if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } - if data.IsBucketPath && data.BucketName != "" { - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 6, "Back to Buckets ") + if data.IsTableBucketPath && data.TableBucketName != "" { + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 6, "Back to Table Buckets ") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } - } else if data.IsTableBucketPath && data.TableBucketName != "" { - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 7, "Back to Table Buckets ") + } else if data.IsBucketPath && data.BucketName != "" { + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 7, "Back to Buckets ") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } @@ -188,11 +188,6 @@ func FileBrowser(data dash.FileBrowserData) templ.Component { if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } - } else if data.CurrentPath == "/table-buckets" { - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 19, "Table Buckets Directory Manage Table Buckets") - if templ_7745c5c3_Err != nil { - return templ_7745c5c3_Err - } } else { templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 20, " table path) + tableLocationLock sync.RWMutex + notFound map[string]struct{} notFoundLock sync.RWMutex s3a *S3ApiServer @@ -53,9 +59,10 @@ type BucketRegistry struct { func NewBucketRegistry(s3a *S3ApiServer) *BucketRegistry { br := &BucketRegistry{ - metadataCache: make(map[string]*BucketMetaData), - notFound: make(map[string]struct{}), - s3a: s3a, + metadataCache: make(map[string]*BucketMetaData), + tableLocationCache: make(map[string]string), + notFound: make(map[string]struct{}), + s3a: s3a, } err := br.init() if err != nil { @@ -68,6 +75,9 @@ func NewBucketRegistry(s3a *S3ApiServer) *BucketRegistry { func (r *BucketRegistry) init() error { var bucketCount int err := filer_pb.List(context.Background(), r.s3a, r.s3a.option.BucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error { + if entry != nil && strings.HasPrefix(entry.Name, ".") { + return nil + } r.LoadBucketMetadata(entry) // Also warm the bucket config cache with Object Lock and versioning settings // This ensures cache consistency across multi-filer clusters after restart @@ -96,7 +106,8 @@ func buildBucketMetadata(accountManager AccountManager, entry *filer_pb.Entry) * entryJson, _ := json.Marshal(entry) glog.V(3).Infof("build bucket metadata,entry=%s", entryJson) bucketMetadata := &BucketMetaData{ - Name: entry.Name, + Name: entry.Name, + IsTableBucket: s3tables.IsTableBucketEntry(entry), //Default ownership: OwnershipBucketOwnerEnforced, which means Acl is disabled ObjectOwnership: s3_constants.OwnershipBucketOwnerEnforced, @@ -153,6 +164,7 @@ func buildBucketMetadata(accountManager AccountManager, entry *filer_pb.Entry) * func (r *BucketRegistry) RemoveBucketMetadata(entry *filer_pb.Entry) { r.removeMetadataCache(entry.Name) r.unMarkNotFound(entry.Name) + r.removeTableLocationCache(entry.Name) } func (r *BucketRegistry) GetBucketMetadata(bucketName string) (*BucketMetaData, s3err.ErrorCode) { @@ -217,6 +229,12 @@ func (r *BucketRegistry) removeMetadataCache(bucket string) { delete(r.metadataCache, bucket) } +func (r *BucketRegistry) removeTableLocationCache(bucket string) { + r.tableLocationLock.Lock() + defer r.tableLocationLock.Unlock() + delete(r.tableLocationCache, bucket) +} + func (r *BucketRegistry) markNotFound(bucket string) { r.notFoundLock.Lock() defer r.notFoundLock.Unlock() diff --git a/weed/s3api/bucket_paths.go b/weed/s3api/bucket_paths.go new file mode 100644 index 000000000..5c7b9520a --- /dev/null +++ b/weed/s3api/bucket_paths.go @@ -0,0 +1,118 @@ +package s3api + +import ( + "errors" + "path" + "strings" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3tables" +) + +func (s3a *S3ApiServer) isTableBucket(bucket string) bool { + if bucket == "" { + return false + } + + // Check cache first + if s3a.bucketRegistry != nil { + s3a.bucketRegistry.metadataCacheLock.RLock() + if metadata, ok := s3a.bucketRegistry.metadataCache[bucket]; ok { + s3a.bucketRegistry.metadataCacheLock.RUnlock() + return metadata.IsTableBucket + } + s3a.bucketRegistry.metadataCacheLock.RUnlock() + } + + entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket) + if err == nil && entry != nil { + if s3a.bucketRegistry != nil { + s3a.bucketRegistry.LoadBucketMetadata(entry) + } + return s3tables.IsTableBucketEntry(entry) + } + + if err != nil && !errors.Is(err, filer_pb.ErrNotFound) { + glog.V(1).Infof("bucket lookup failed for %s: %v", bucket, err) + } + return false +} + +func (s3a *S3ApiServer) tableLocationDir(bucket string) (string, bool) { + if bucket == "" { + return "", false + } + + // Check cache first + if s3a.bucketRegistry != nil { + s3a.bucketRegistry.tableLocationLock.RLock() + if tablePath, ok := s3a.bucketRegistry.tableLocationCache[bucket]; ok { + s3a.bucketRegistry.tableLocationLock.RUnlock() + return tablePath, tablePath != "" + } + s3a.bucketRegistry.tableLocationLock.RUnlock() + } + + entry, err := s3a.getEntry(s3tables.GetTableLocationMappingDir(), bucket) + tablePath := "" + if err == nil && entry != nil && len(entry.Content) > 0 { + tablePath = strings.TrimSpace(string(entry.Content)) + } + + // Only cache definitive results: successful lookup (tablePath set) or definitive not-found (ErrNotFound) + // Don't cache transient errors to avoid treating temporary failures as permanent misses + if err == nil || errors.Is(err, filer_pb.ErrNotFound) { + if s3a.bucketRegistry != nil { + s3a.bucketRegistry.tableLocationLock.Lock() + s3a.bucketRegistry.tableLocationCache[bucket] = tablePath + s3a.bucketRegistry.tableLocationLock.Unlock() + } + } + + if tablePath == "" { + if err != nil && !errors.Is(err, filer_pb.ErrNotFound) { + glog.V(1).Infof("table location mapping lookup failed for %s: %v", bucket, err) + } + return "", false + } + + return tablePath, true +} + +func (s3a *S3ApiServer) bucketRoot(bucket string) string { + // Returns the unified buckets root path for all bucket types + return s3a.option.BucketsPath +} + +func (s3a *S3ApiServer) bucketDir(bucket string) string { + if tablePath, ok := s3a.tableLocationDir(bucket); ok { + return tablePath + } + if s3a.isTableBucket(bucket) { + return s3tables.GetTableObjectBucketPath(bucket) + } + return path.Join(s3a.bucketRoot(bucket), bucket) +} + +func (s3a *S3ApiServer) bucketPrefix(bucket string) string { + return s3a.bucketDir(bucket) + "/" +} + +func (s3a *S3ApiServer) bucketExists(bucket string) (bool, error) { + entry, err := s3a.getBucketEntry(bucket) + if err != nil { + if errors.Is(err, filer_pb.ErrNotFound) { + return false, nil + } + return false, err + } + return entry != nil, nil +} + +func (s3a *S3ApiServer) getBucketEntry(bucket string) (*filer_pb.Entry, error) { + if tablePath, ok := s3a.tableLocationDir(bucket); ok { + return s3a.getEntry(path.Dir(tablePath), path.Base(tablePath)) + } + return s3a.getEntry(s3a.option.BucketsPath, bucket) +} diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index e787bf18d..4ff74c6d0 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -591,7 +591,7 @@ func (s3a *S3ApiServer) getEntryNameAndDir(input *s3.CompleteMultipartUploadInpu dirName = "" } dirName = strings.TrimPrefix(dirName, "/") - dirName = fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, *input.Bucket, dirName) + dirName = fmt.Sprintf("%s/%s", s3a.bucketDir(*input.Bucket), dirName) // remove suffix '/' dirName = strings.TrimSuffix(dirName, "/") diff --git a/weed/s3api/iceberg/iceberg.go b/weed/s3api/iceberg/iceberg.go index f0e454a51..96a6230bf 100644 --- a/weed/s3api/iceberg/iceberg.go +++ b/weed/s3api/iceberg/iceberg.go @@ -9,6 +9,7 @@ import ( "fmt" "net/http" "os" + "path" "strings" "time" @@ -172,16 +173,16 @@ func (s *Server) Auth(handler http.HandlerFunc) http.HandlerFunc { } // saveMetadataFile saves the Iceberg metadata JSON file to the filer. -// It constructs the correct filler path from the S3 location components. -func (s *Server) saveMetadataFile(ctx context.Context, bucketName, namespace, tableName, metadataFileName string, content []byte) error { - // Construct filer path: /table-buckets////metadata/ - // Note: s3tables.TablesPath is "/table-buckets" +// It constructs the filer path from the S3 location components. +func (s *Server) saveMetadataFile(ctx context.Context, bucketName, tablePath, metadataFileName string, content []byte) error { // Create context with timeout for file operations opCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() return s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + bucketsPath := s3tables.TablesPath + ensureDir := func(parent, name, errorContext string) error { _, err := filer_pb.LookupEntry(opCtx, client, &filer_pb.LookupDirectoryEntryRequest{ Directory: parent, @@ -216,19 +217,33 @@ func (s *Server) saveMetadataFile(ctx context.Context, bucketName, namespace, ta return nil } - // 1. Ensure table directory exists: /table-buckets///
- tableDir := fmt.Sprintf("/table-buckets/%s/%s/%s", bucketName, namespace, tableName) - if err := ensureDir(fmt.Sprintf("/table-buckets/%s/%s", bucketName, namespace), tableName, "table directory"); err != nil { + // 1. Ensure bucket directory exists: / + if err := ensureDir(bucketsPath, bucketName, "bucket directory"); err != nil { return err } - // 2. Ensure metadata directory exists: /table-buckets///
/metadata - metadataDir := fmt.Sprintf("%s/metadata", tableDir) + // 2. Ensure table path exists: // + tableDir := path.Join(bucketsPath, bucketName) + if tablePath != "" { + segments := strings.Split(tablePath, "/") + for _, segment := range segments { + if segment == "" { + continue + } + if err := ensureDir(tableDir, segment, "table directory"); err != nil { + return err + } + tableDir = path.Join(tableDir, segment) + } + } + + // 3. Ensure metadata directory exists: ///metadata + metadataDir := path.Join(tableDir, "metadata") if err := ensureDir(tableDir, "metadata", "metadata directory"); err != nil { return err } - // 3. Write the file + // 4. Write the file resp, err := client.CreateEntry(opCtx, &filer_pb.CreateEntryRequest{ Directory: metadataDir, Entry: &filer_pb.Entry{ @@ -278,6 +293,34 @@ func encodeNamespace(parts []string) string { return strings.Join(parts, "\x1F") } +func parseS3Location(location string) (bucketName, tablePath string, err error) { + if !strings.HasPrefix(location, "s3://") { + return "", "", fmt.Errorf("unsupported location: %s", location) + } + trimmed := strings.TrimPrefix(location, "s3://") + trimmed = strings.TrimSuffix(trimmed, "/") + if trimmed == "" { + return "", "", fmt.Errorf("invalid location: %s", location) + } + parts := strings.SplitN(trimmed, "/", 2) + bucketName = parts[0] + if bucketName == "" { + return "", "", fmt.Errorf("invalid location bucket: %s", location) + } + if len(parts) == 2 { + tablePath = parts[1] + } + return bucketName, tablePath, nil +} + +func tableLocationFromMetadataLocation(metadataLocation string) string { + trimmed := strings.TrimSuffix(metadataLocation, "/") + if idx := strings.LastIndex(trimmed, "/metadata/"); idx != -1 { + return trimmed[:idx] + } + return trimmed +} + // writeJSON writes a JSON response. func writeJSON(w http.ResponseWriter, status int, v interface{}) { w.Header().Set("Content-Type", "application/json") @@ -637,7 +680,24 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) { // Generate UUID for the new table tableUUID := uuid.New() - location := fmt.Sprintf("s3://%s/%s/%s", bucketName, encodeNamespace(namespace), req.Name) + location := strings.TrimSuffix(req.Location, "/") + tablePath := path.Join(encodeNamespace(namespace), req.Name) + storageBucket := bucketName + tableLocationBucket := "" + if location != "" { + parsedBucket, parsedPath, err := parseS3Location(location) + if err != nil { + writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid table location: "+err.Error()) + return + } + if strings.HasSuffix(parsedBucket, "--table-s3") && parsedPath == "" { + tableLocationBucket = parsedBucket + } + } + if tableLocationBucket == "" { + tableLocationBucket = fmt.Sprintf("%s--table-s3", tableUUID.String()) + } + location = fmt.Sprintf("s3://%s", tableLocationBucket) // Build proper Iceberg table metadata using iceberg-go types metadata := newTableMetadata(tableUUID, location, req.Schema, req.PartitionSpec, req.WriteOrder, req.Properties) @@ -656,12 +716,12 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) { // 1. Save metadata file to filer tableName := req.Name metadataFileName := "v1.metadata.json" // Initial version is always 1 - if err := s.saveMetadataFile(r.Context(), bucketName, encodeNamespace(namespace), tableName, metadataFileName, metadataBytes); err != nil { + if err := s.saveMetadataFile(r.Context(), storageBucket, tablePath, metadataFileName, metadataBytes); err != nil { writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+err.Error()) return } - metadataLocation := fmt.Sprintf("s3://%s/%s/%s/metadata/%s", bucketName, encodeNamespace(namespace), tableName, metadataFileName) + metadataLocation := fmt.Sprintf("%s/metadata/%s", location, metadataFileName) // Use S3 Tables manager to create table createReq := &s3tables.CreateTableRequest{ @@ -750,7 +810,10 @@ func (s *Server) handleLoadTable(w http.ResponseWriter, r *http.Request) { } // Build table metadata using iceberg-go types - location := fmt.Sprintf("s3://%s/%s/%s", bucketName, encodeNamespace(namespace), tableName) + location := tableLocationFromMetadataLocation(getResp.MetadataLocation) + if location == "" { + location = fmt.Sprintf("s3://%s/%s/%s", bucketName, encodeNamespace(namespace), tableName) + } tableUUID := uuid.Nil if getResp.Metadata != nil && getResp.Metadata.Iceberg != nil && getResp.Metadata.Iceberg.TableUUID != "" { if parsed, err := uuid.Parse(getResp.Metadata.Iceberg.TableUUID); err == nil { @@ -911,7 +974,10 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { } // Build the current metadata - location := fmt.Sprintf("s3://%s/%s/%s", bucketName, encodeNamespace(namespace), tableName) + location := tableLocationFromMetadataLocation(getResp.MetadataLocation) + if location == "" { + location = fmt.Sprintf("s3://%s/%s/%s", bucketName, encodeNamespace(namespace), tableName) + } tableUUID := uuid.Nil if getResp.Metadata != nil && getResp.Metadata.Iceberg != nil && getResp.Metadata.Iceberg.TableUUID != "" { if parsed, err := uuid.Parse(getResp.Metadata.Iceberg.TableUUID); err == nil { @@ -973,8 +1039,7 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { // Determine next metadata version metadataVersion := getResp.MetadataVersion + 1 metadataFileName := fmt.Sprintf("v%d.metadata.json", metadataVersion) - newMetadataLocation := fmt.Sprintf("s3://%s/%s/%s/metadata/%s", - bucketName, encodeNamespace(namespace), tableName, metadataFileName) + newMetadataLocation := fmt.Sprintf("%s/metadata/%s", strings.TrimSuffix(location, "/"), metadataFileName) // Serialize metadata to JSON metadataBytes, err := json.Marshal(newMetadata) @@ -984,7 +1049,8 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { } // 1. Save metadata file to filer - if err := s.saveMetadataFile(r.Context(), bucketName, encodeNamespace(namespace), tableName, metadataFileName, metadataBytes); err != nil { + tablePath := path.Join(encodeNamespace(namespace), tableName) + if err := s.saveMetadataFile(r.Context(), bucketName, tablePath, metadataFileName, metadataBytes); err != nil { writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+err.Error()) return } diff --git a/weed/s3api/s3api_bucket_config.go b/weed/s3api/s3api_bucket_config.go index afe02f78c..aad6d03f3 100644 --- a/weed/s3api/s3api_bucket_config.go +++ b/weed/s3api/s3api_bucket_config.go @@ -355,7 +355,7 @@ func (s3a *S3ApiServer) getBucketConfig(bucket string) (*BucketConfig, s3err.Err } // Try to get from filer - entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket) + entry, err := s3a.getBucketEntry(bucket) if err != nil { if errors.Is(err, filer_pb.ErrNotFound) { // Bucket doesn't exist - set negative cache @@ -471,7 +471,7 @@ func (s3a *S3ApiServer) updateBucketConfig(bucket string, updateFn func(*BucketC // Save to filer glog.V(3).Infof("updateBucketConfig: saving entry to filer for bucket %s", bucket) - err := s3a.updateEntry(s3a.option.BucketsPath, config.Entry) + err := s3a.updateEntry(s3a.bucketRoot(bucket), config.Entry) if err != nil { glog.Errorf("updateBucketConfig: failed to update bucket entry for %s: %v", bucket, err) return s3err.ErrInternalError @@ -801,7 +801,7 @@ func (s3a *S3ApiServer) loadBucketMetadataFromFiler(bucket string) (*BucketMetad } // Get bucket directory entry to access its content - entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket) + entry, err := s3a.getBucketEntry(bucket) if err != nil { // Check if this is a "not found" error if errors.Is(err, filer_pb.ErrNotFound) { @@ -880,7 +880,7 @@ func (s3a *S3ApiServer) setBucketMetadata(bucket string, metadata *BucketMetadat // Update the bucket entry with new content err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { // Get current bucket entry - entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket) + entry, err := s3a.getBucketEntry(bucket) if err != nil { return fmt.Errorf("error retrieving bucket directory %s: %w", bucket, err) } @@ -892,7 +892,7 @@ func (s3a *S3ApiServer) setBucketMetadata(bucket string, metadata *BucketMetadat entry.Content = metadataBytes request := &filer_pb.UpdateEntryRequest{ - Directory: s3a.option.BucketsPath, + Directory: s3a.bucketRoot(bucket), Entry: entry, } diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index b64858ad8..2c4a1a884 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -73,6 +73,9 @@ func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Reques var listBuckets ListAllMyBucketsList for _, entry := range entries { if entry.IsDirectory { + if strings.HasPrefix(entry.Name, ".") { + continue + } // Unauthenticated users should not see any buckets if identity == nil { continue @@ -184,6 +187,10 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request) // Check collection existence first collectionExists := false + if s3a.isTableBucket(bucket) { + s3err.WriteErrorResponse(w, r, s3err.ErrBucketAlreadyExists) + return + } if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { if resp, err := client.CollectionList(context.Background(), &filer_pb.CollectionListRequest{ IncludeEcVolumes: true, @@ -331,6 +338,11 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque bucket, _ := s3_constants.GetBucketAndObject(r) glog.V(3).Infof("DeleteBucketHandler %s", bucket) + if s3a.isTableBucket(bucket) { + s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied) + return + } + if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone { s3err.WriteErrorResponse(w, r, err) return @@ -430,7 +442,7 @@ func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request bucket, _ := s3_constants.GetBucketAndObject(r) glog.V(3).Infof("HeadBucketHandler %s", bucket) - if entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket); entry == nil || errors.Is(err, filer_pb.ErrNotFound) { + if entry, err := s3a.getBucketEntry(bucket); entry == nil || errors.Is(err, filer_pb.ErrNotFound) { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) return } diff --git a/weed/s3api/s3api_bucket_handlers_object_lock_config.go b/weed/s3api/s3api_bucket_handlers_object_lock_config.go index a6b422617..d212aba2e 100644 --- a/weed/s3api/s3api_bucket_handlers_object_lock_config.go +++ b/weed/s3api/s3api_bucket_handlers_object_lock_config.go @@ -119,7 +119,7 @@ func (s3a *S3ApiServer) GetObjectLockConfigurationHandler(w http.ResponseWriter, // If no cached Object Lock configuration, reload entry from filer to get the latest extended attributes // This handles cases where the cache might have a stale entry due to timing issues with metadata updates glog.V(3).Infof("GetObjectLockConfigurationHandler: no cached ObjectLockConfig, reloading entry from filer for %s", bucket) - freshEntry, err := s3a.getEntry(s3a.option.BucketsPath, bucket) + freshEntry, err := s3a.getBucketEntry(bucket) if err != nil { if errors.Is(err, filer_pb.ErrNotFound) { glog.V(1).Infof("GetObjectLockConfigurationHandler: bucket %s not found while reloading entry", bucket) diff --git a/weed/s3api/s3api_bucket_policy_handlers.go b/weed/s3api/s3api_bucket_policy_handlers.go index a42de4442..9ffb45ced 100644 --- a/weed/s3api/s3api_bucket_policy_handlers.go +++ b/weed/s3api/s3api_bucket_policy_handlers.go @@ -32,7 +32,7 @@ func (s3a *S3ApiServer) GetBucketPolicyHandler(w http.ResponseWriter, r *http.Re glog.V(3).Infof("GetBucketPolicyHandler: bucket=%s", bucket) // Validate bucket exists first for correct error mapping - _, err := s3a.getEntry(s3a.option.BucketsPath, bucket) + _, err := s3a.getBucketEntry(bucket) if err != nil { if errors.Is(err, filer_pb.ErrNotFound) { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) @@ -137,7 +137,7 @@ func (s3a *S3ApiServer) DeleteBucketPolicyHandler(w http.ResponseWriter, r *http glog.V(3).Infof("DeleteBucketPolicyHandler: bucket=%s", bucket) // Validate bucket exists first for correct error mapping - _, err := s3a.getEntry(s3a.option.BucketsPath, bucket) + _, err := s3a.getBucketEntry(bucket) if err != nil { if errors.Is(err, filer_pb.ErrNotFound) { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) @@ -196,7 +196,7 @@ func (s3a *S3ApiServer) getBucketPolicy(bucket string) (*policy_engine.PolicyDoc var policyDoc policy_engine.PolicyDocument err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ - Directory: s3a.option.BucketsPath, + Directory: s3a.bucketRoot(bucket), Name: bucket, }) if err != nil { @@ -238,7 +238,7 @@ func (s3a *S3ApiServer) setBucketPolicy(bucket string, policyDoc *policy_engine. return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { // First, get the current entry to preserve other attributes resp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ - Directory: s3a.option.BucketsPath, + Directory: s3a.bucketRoot(bucket), Name: bucket, }) if err != nil { @@ -255,7 +255,7 @@ func (s3a *S3ApiServer) setBucketPolicy(bucket string, policyDoc *policy_engine. // Update the entry with new metadata _, err = client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ - Directory: s3a.option.BucketsPath, + Directory: s3a.bucketRoot(bucket), Entry: entry, }) @@ -268,7 +268,7 @@ func (s3a *S3ApiServer) deleteBucketPolicy(bucket string) error { return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { // Get the current entry resp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ - Directory: s3a.option.BucketsPath, + Directory: s3a.bucketRoot(bucket), Name: bucket, }) if err != nil { @@ -285,7 +285,7 @@ func (s3a *S3ApiServer) deleteBucketPolicy(bucket string) error { // Update the entry _, err = client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ - Directory: s3a.option.BucketsPath, + Directory: s3a.bucketRoot(bucket), Entry: entry, }) diff --git a/weed/s3api/s3api_implicit_directory_test.go b/weed/s3api/s3api_implicit_directory_test.go index e7c3633fc..33d16c149 100644 --- a/weed/s3api/s3api_implicit_directory_test.go +++ b/weed/s3api/s3api_implicit_directory_test.go @@ -73,8 +73,8 @@ func TestImplicitDirectoryBehaviorLogic(t *testing.T) { isDirectory: true, hasChildren: false, versioningEnabled: false, - shouldReturn404: false, - description: "Should return 200 for empty directory", + shouldReturn404: true, + description: "Should return 404 for empty directory", }, { name: "Regular file: non-zero size", @@ -116,10 +116,11 @@ func TestImplicitDirectoryBehaviorLogic(t *testing.T) { // Test the logic: should we return 404? // Logic from HeadObjectHandler: // if !versioningConfigured && !strings.HasSuffix(object, "/") { - // if isZeroByteFile || isActualDirectory { - // if hasChildren { - // return 404 - // } + // if isActualDirectory { + // return 404 + // } + // if isZeroByteFile && hasChildren { + // return 404 // } // } @@ -128,10 +129,10 @@ func TestImplicitDirectoryBehaviorLogic(t *testing.T) { shouldReturn404 := false if !tt.versioningEnabled && !tt.hasTrailingSlash { - if isZeroByteFile || isActualDirectory { - if tt.hasChildren { - shouldReturn404 = true - } + if isActualDirectory { + shouldReturn404 = true + } else if isZeroByteFile && tt.hasChildren { + shouldReturn404 = true } } diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 39cde56ac..2e0a732d0 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -308,7 +308,8 @@ func removeDuplicateSlashes(object string) string { return result.String() } -// hasChildren checks if a path has any child objects (is a directory with contents) +// hasChildren checks if a path has any child objects (is a directory with contents). +// On unexpected errors, it logs and conservatively returns true to avoid hiding entries. // // This helper function is used to distinguish implicit directories from regular files or empty directories. // An implicit directory is one that exists only because it has children, not because it was explicitly created. @@ -333,7 +334,7 @@ func (s3a *S3ApiServer) hasChildren(bucket, prefix string) bool { cleanPrefix := strings.TrimPrefix(prefix, "/") // The directory to list is bucketDir + cleanPrefix - bucketDir := s3a.option.BucketsPath + "/" + bucket + bucketDir := s3a.bucketDir(bucket) fullPath := bucketDir + "/" + cleanPrefix // Try to list one child object in the directory @@ -361,7 +362,14 @@ func (s3a *S3ApiServer) hasChildren(bucket, prefix string) bool { }) // If we got an entry (not EOF), then it has children - return err == nil + if err == nil { + return true + } + if errors.Is(err, io.EOF) || errors.Is(err, filer_pb.ErrNotFound) { + return false + } + glog.V(1).Infof("hasChildren: list entries failed for %s/%s: %v", bucket, cleanPrefix, err) + return true } // checkDirectoryObject checks if the object is a directory object (ends with "/") and if it exists @@ -374,7 +382,7 @@ func (s3a *S3ApiServer) checkDirectoryObject(bucket, object string) (*filer_pb.E return nil, false, nil // Not a directory object } - bucketDir := s3a.option.BucketsPath + "/" + bucket + bucketDir := s3a.bucketDir(bucket) cleanObject := strings.TrimSuffix(object, "/") if cleanObject == "" { @@ -416,7 +424,7 @@ func (s3a *S3ApiServer) resolveObjectEntry(bucket, object string) (*filer_pb.Ent } // For non-versioned buckets, verify directly - bucketDir := s3a.option.BucketsPath + "/" + bucket + bucketDir := s3a.bucketDir(bucket) return s3a.getEntry(bucketDir, object) } @@ -542,7 +550,7 @@ func (s3a *S3ApiServer) toFilerPath(bucket, object string) string { // Returns the raw file path - no URL escaping needed // The path is used directly, not embedded in a URL object = s3_constants.NormalizeObjectKey(object) - return fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, object) + return fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), object) } // hasConditionalHeaders checks if the request has any conditional headers @@ -659,7 +667,7 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) // - If .versions/ exists: real versions available, use getLatestObjectVersion // - If .versions/ doesn't exist (ErrNotFound): only null version at regular path, use it directly // - If transient error: fall back to getLatestObjectVersion which has retry logic - bucketDir := s3a.option.BucketsPath + "/" + bucket + bucketDir := s3a.bucketDir(bucket) normalizedObject := s3_constants.NormalizeObjectKey(object) versionsDir := normalizedObject + s3_constants.VersionsFolder @@ -2135,7 +2143,7 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request // - If .versions/ exists: real versions available, use getLatestObjectVersion // - If .versions/ doesn't exist (ErrNotFound): only null version at regular path, use it directly // - If transient error: fall back to getLatestObjectVersion which has retry logic - bucketDir := s3a.option.BucketsPath + "/" + bucket + bucketDir := s3a.bucketDir(bucket) normalizedObject := s3_constants.NormalizeObjectKey(object) versionsDir := normalizedObject + s3_constants.VersionsFolder @@ -2280,7 +2288,7 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request // // Edge Cases Handled: // - Empty files (0-byte, no children) → 200 OK (legitimate empty file) - // - Empty directories (no children) → 200 OK (legitimate empty directory) + // - Empty directories (no children) → 404 Not Found (directories are not objects) // - Explicit directory requests (trailing slash) → 200 OK (handled earlier) // - Versioned objects → Skip this check (different semantics) // @@ -2293,9 +2301,11 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request // PyArrow may create 0-byte files when writing datasets, or the filer may have actual directories if objectEntryForSSE.Attributes != nil { isZeroByteFile := objectEntryForSSE.Attributes.FileSize == 0 && !objectEntryForSSE.IsDirectory - isActualDirectory := objectEntryForSSE.IsDirectory - - if isZeroByteFile || isActualDirectory { + if objectEntryForSSE.IsDirectory { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + if isZeroByteFile { // Check if it has children (making it an implicit directory) if s3a.hasChildren(bucket, object) { // This is an implicit directory with children @@ -2414,7 +2424,7 @@ func writeFinalResponse(w http.ResponseWriter, proxyResponse *http.Response, bod // fetchObjectEntry fetches the filer entry for an object // Returns nil if not found (not an error), or propagates other errors func (s3a *S3ApiServer) fetchObjectEntry(bucket, object string) (*filer_pb.Entry, error) { - objectPath := fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, object) + objectPath := fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), object) fetchedEntry, fetchErr := s3a.getEntry("", objectPath) if fetchErr != nil { if errors.Is(fetchErr, filer_pb.ErrNotFound) { @@ -2428,7 +2438,7 @@ func (s3a *S3ApiServer) fetchObjectEntry(bucket, object string) (*filer_pb.Entry // fetchObjectEntryRequired fetches the filer entry for an object // Returns an error if the object is not found or any other error occurs func (s3a *S3ApiServer) fetchObjectEntryRequired(bucket, object string) (*filer_pb.Entry, error) { - objectPath := fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, object) + objectPath := fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), object) fetchedEntry, fetchErr := s3a.getEntry("", objectPath) if fetchErr != nil { return nil, fetchErr // Return error for both not-found and other errors @@ -3350,7 +3360,7 @@ func (s3a *S3ApiServer) getMultipartInfo(entry *filer_pb.Entry, partNumber int) // buildRemoteObjectPath builds the filer directory and object name from S3 bucket/object. // This is shared by all remote object caching functions. func (s3a *S3ApiServer) buildRemoteObjectPath(bucket, object string) (dir, name string) { - dir = s3a.option.BucketsPath + "/" + bucket + dir = s3a.bucketDir(bucket) name = s3_constants.NormalizeObjectKey(object) if idx := strings.LastIndex(name, "/"); idx > 0 { dir = dir + "/" + name[:idx] @@ -3418,7 +3428,7 @@ func (s3a *S3ApiServer) cacheRemoteObjectForStreaming(r *http.Request, entry *fi if versionId != "" && versionId != "null" { // This is a specific version - entry is located at /buckets//.versions/v_ normalizedObject := s3_constants.NormalizeObjectKey(object) - dir = s3a.option.BucketsPath + "/" + bucket + "/" + normalizedObject + s3_constants.VersionsFolder + dir = s3a.bucketDir(bucket) + "/" + normalizedObject + s3_constants.VersionsFolder name = s3a.getVersionFileName(versionId) } else { // Non-versioned object or "null" version - lives at the main path diff --git a/weed/s3api/s3api_object_handlers_acl.go b/weed/s3api/s3api_object_handlers_acl.go index c5c9eeae3..5ef75a46b 100644 --- a/weed/s3api/s3api_object_handlers_acl.go +++ b/weed/s3api/s3api_object_handlers_acl.go @@ -306,7 +306,7 @@ func (s3a *S3ApiServer) PutObjectAclHandler(w http.ResponseWriter, r *http.Reque if versioningConfigured { if versionId != "" && versionId != "null" { // Versioned object - update the specific version file in .versions directory - updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + s3_constants.VersionsFolder + updateDirectory = s3a.bucketDir(bucket) + "/" + object + s3_constants.VersionsFolder } else { // Latest version in versioned bucket - could be null version or versioned object // Extract version ID from the entry to determine where it's stored @@ -319,15 +319,15 @@ func (s3a *S3ApiServer) PutObjectAclHandler(w http.ResponseWriter, r *http.Reque if actualVersionId == "null" || actualVersionId == "" { // Null version (pre-versioning object) - stored as regular file - updateDirectory = s3a.option.BucketsPath + "/" + bucket + updateDirectory = s3a.bucketDir(bucket) } else { // Versioned object - stored in .versions directory - updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + s3_constants.VersionsFolder + updateDirectory = s3a.bucketDir(bucket) + "/" + object + s3_constants.VersionsFolder } } } else { // Non-versioned object - stored as regular file - updateDirectory = s3a.option.BucketsPath + "/" + bucket + updateDirectory = s3a.bucketDir(bucket) } // Update the object with new ACL metadata diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go index abb1b7bd5..5277534f1 100644 --- a/weed/s3api/s3api_object_handlers_copy.go +++ b/weed/s3api/s3api_object_handlers_copy.go @@ -65,7 +65,7 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request replaceMeta, replaceTagging := replaceDirective(r.Header) if (srcBucket == dstBucket && srcObject == dstObject || cpSrcPath == "") && (replaceMeta || replaceTagging) { - fullPath := util.FullPath(fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, dstBucket, dstObject)) + fullPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(dstBucket), dstObject)) dir, name := fullPath.DirAndName() entry, err := s3a.getEntry(dir, name) if err != nil || entry.IsDirectory { @@ -116,7 +116,7 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request } else if srcVersioningState == s3_constants.VersioningSuspended { // Versioning suspended - current object is stored as regular file ("null" version) // Try regular file first, fall back to latest version if needed - srcPath := util.FullPath(fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, srcBucket, srcObject)) + srcPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(srcBucket), srcObject)) dir, name := srcPath.DirAndName() entry, err = s3a.getEntry(dir, name) if err != nil { @@ -126,7 +126,7 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request } } else { // No versioning configured - use regular retrieval - srcPath := util.FullPath(fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, srcBucket, srcObject)) + srcPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(srcBucket), srcObject)) dir, name := srcPath.DirAndName() entry, err = s3a.getEntry(dir, name) } @@ -160,8 +160,8 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request var sourceMd5 []byte if entry.Attributes != nil && len(entry.Attributes.Md5) > 0 { sourceMd5 = append([]byte(nil), entry.Attributes.Md5...) - srcPath := fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, srcBucket, srcObject) - dstPath := fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, dstBucket, dstObject) + srcPath := fmt.Sprintf("%s/%s", s3a.bucketDir(srcBucket), srcObject) + dstPath := fmt.Sprintf("%s/%s", s3a.bucketDir(dstBucket), dstObject) state := DetectEncryptionStateWithEntry(entry, r, srcPath, dstPath) s3a.applyCopyBucketDefaultEncryption(state, dstBucket) if strategy, err := DetermineUnifiedCopyStrategy(state, entry.Extended, r); err == nil && strategy == CopyStrategyDirect { @@ -310,7 +310,7 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request // Calculate ETag for versioning filerEntry := &filer.Entry{ - FullPath: util.FullPath(fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, dstBucket, dstObject)), + FullPath: util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(dstBucket), dstObject)), Attr: filer.Attr{ FileSize: dstEntry.Attributes.FileSize, Mtime: time.Unix(dstEntry.Attributes.Mtime, 0), @@ -328,7 +328,7 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request // Create version file versionFileName := s3a.getVersionFileName(dstVersionId) versionObjectPath := dstObject + ".versions/" + versionFileName - bucketDir := s3a.option.BucketsPath + "/" + dstBucket + bucketDir := s3a.bucketDir(dstBucket) if err := s3a.mkFile(bucketDir, versionObjectPath, dstEntry.Chunks, func(entry *filer_pb.Entry) { entry.Attributes = dstEntry.Attributes @@ -354,7 +354,7 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request // Remove any versioning-related metadata from source that shouldn't carry over cleanupVersioningMetadata(dstEntry.Extended) - dstPath := util.FullPath(fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, dstBucket, dstObject)) + dstPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(dstBucket), dstObject)) dstDir, dstName := dstPath.DirAndName() // Check if destination exists and remove it first (S3 copy overwrites) @@ -523,7 +523,7 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req } else if srcVersioningState == s3_constants.VersioningSuspended { // Versioning suspended - current object is stored as regular file ("null" version) // Try regular file first, fall back to latest version if needed - srcPath := util.FullPath(fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, srcBucket, srcObject)) + srcPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(srcBucket), srcObject)) dir, name := srcPath.DirAndName() entry, err = s3a.getEntry(dir, name) if err != nil { @@ -533,7 +533,7 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req } } else { // No versioning configured - use regular retrieval - srcPath := util.FullPath(fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, srcBucket, srcObject)) + srcPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(srcBucket), srcObject)) dir, name := srcPath.DirAndName() entry, err = s3a.getEntry(dir, name) } diff --git a/weed/s3api/s3api_object_handlers_copy_unified.go b/weed/s3api/s3api_object_handlers_copy_unified.go index 0efda0de6..223e42ac9 100644 --- a/weed/s3api/s3api_object_handlers_copy_unified.go +++ b/weed/s3api/s3api_object_handlers_copy_unified.go @@ -15,8 +15,8 @@ import ( // Returns chunks and destination metadata that should be applied to the destination entry func (s3a *S3ApiServer) executeUnifiedCopyStrategy(entry *filer_pb.Entry, r *http.Request, srcBucket, dstBucket, srcObject, dstObject string) ([]*filer_pb.FileChunk, map[string][]byte, error) { // Detect encryption state (using entry-aware detection for multipart objects) - srcPath := fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, srcBucket, srcObject) - dstPath := fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, dstBucket, dstObject) + srcPath := fmt.Sprintf("%s/%s", s3a.bucketDir(srcBucket), srcObject) + dstPath := fmt.Sprintf("%s/%s", s3a.bucketDir(dstBucket), dstObject) state := DetectEncryptionStateWithEntry(entry, r, srcPath, dstPath) // Debug logging for encryption state diff --git a/weed/s3api/s3api_object_handlers_delete.go b/weed/s3api/s3api_object_handlers_delete.go index 8618933df..d802ecf06 100644 --- a/weed/s3api/s3api_object_handlers_delete.go +++ b/weed/s3api/s3api_object_handlers_delete.go @@ -121,7 +121,7 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque return } - target := util.FullPath(fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, object)) + target := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), object)) dir, name := target.DirAndName() err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { @@ -333,7 +333,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h entryName = object.Key[lastSeparator+1:] parentDirectoryPath = object.Key[:lastSeparator] } - parentDirectoryPath = fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, parentDirectoryPath) + parentDirectoryPath = fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), parentDirectoryPath) err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive) if err == nil { diff --git a/weed/s3api/s3api_object_handlers_list.go b/weed/s3api/s3api_object_handlers_list.go index beda56e69..1a4cf84dd 100644 --- a/weed/s3api/s3api_object_handlers_list.go +++ b/weed/s3api/s3api_object_handlers_list.go @@ -86,7 +86,7 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ } if len(response.Contents) == 0 { - if exists, existErr := s3a.exists(s3a.option.BucketsPath, bucket, true); existErr == nil && !exists { + if exists, existErr := s3a.bucketExists(bucket); existErr == nil && !exists { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) return } @@ -150,7 +150,7 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ } if len(response.Contents) == 0 { - if exists, existErr := s3a.exists(s3a.option.BucketsPath, bucket, true); existErr == nil && !exists { + if exists, existErr := s3a.bucketExists(bucket); existErr == nil && !exists { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) return } @@ -163,7 +163,7 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, maxKeys uint16, originalMarker string, delimiter string, encodingTypeUrl bool, fetchOwner bool) (response ListBucketResult, err error) { // convert full path prefix into directory name and prefix for entry name requestDir, prefix, marker := normalizePrefixMarker(originalPrefix, originalMarker) - bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket) + bucketPrefix := s3a.bucketPrefix(bucket) reqDir := bucketPrefix[:len(bucketPrefix)-1] if requestDir != "" { reqDir = fmt.Sprintf("%s%s", bucketPrefix, requestDir) @@ -230,6 +230,16 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m empty = false dirName, entryName, _ := entryUrlEncode(dir, entry.Name, encodingTypeUrl) if entry.IsDirectory { + if originalPrefix != "" { + normalizedPrefix := strings.TrimPrefix(strings.TrimSuffix(originalPrefix, "/"), "/") + if normalizedPrefix != "" { + relativePath := strings.TrimPrefix(fmt.Sprintf("%s/%s", dir, entry.Name), bucketPrefix) + relativePath = strings.TrimPrefix(relativePath, "/") + if normalizedPrefix == relativePath && !s3a.hasChildren(bucket, relativePath) && !entry.IsDirectoryKeyObject() { + return + } + } + } // When delimiter is specified, apply delimiter logic to directory key objects too if delimiter != "" && entry.IsDirectoryKeyObject() { // Apply the same delimiter logic as for regular files @@ -332,6 +342,11 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m } }) if doErr != nil { + if errors.Is(doErr, filer_pb.ErrNotFound) { + empty = true + nextMarker = "" + break + } return doErr } @@ -508,7 +523,11 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d defer cancel() stream, listErr := client.ListEntries(ctx, request) if listErr != nil { - err = fmt.Errorf("list entires %+v: %v", request, listErr) + if errors.Is(listErr, filer_pb.ErrNotFound) { + err = filer_pb.ErrNotFound + return + } + err = fmt.Errorf("list entries %+v: %w", request, listErr) return } @@ -518,7 +537,7 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d if recvErr == io.EOF { break } else { - err = fmt.Errorf("iterating entires %+v: %v", request, recvErr) + err = fmt.Errorf("iterating entries %+v: %v", request, recvErr) return } } @@ -552,7 +571,7 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d // Extract object name from .versions directory name baseObjectName := strings.TrimSuffix(entry.Name, s3_constants.VersionsFolder) // Construct full object path relative to bucket - bucketFullPath := s3a.option.BucketsPath + "/" + bucket + bucketFullPath := s3a.bucketDir(bucket) bucketRelativePath := strings.TrimPrefix(dir, bucketFullPath) bucketRelativePath = strings.TrimPrefix(bucketRelativePath, "/") var fullObjectPath string diff --git a/weed/s3api/s3api_object_handlers_multipart.go b/weed/s3api/s3api_object_handlers_multipart.go index 88e754461..6b085e499 100644 --- a/weed/s3api/s3api_object_handlers_multipart.go +++ b/weed/s3api/s3api_object_handlers_multipart.go @@ -452,7 +452,7 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ } func (s3a *S3ApiServer) genUploadsFolder(bucket string) string { - return fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, s3_constants.MultipartUploadsFolder) + return fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), s3_constants.MultipartUploadsFolder) } func (s3a *S3ApiServer) genPartUploadPath(bucket, uploadID string, partID int) string { diff --git a/weed/s3api/s3api_object_handlers_postpolicy.go b/weed/s3api/s3api_object_handlers_postpolicy.go index 58e2a89ac..102f20022 100644 --- a/weed/s3api/s3api_object_handlers_postpolicy.go +++ b/weed/s3api/s3api_object_handlers_postpolicy.go @@ -114,7 +114,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R } } - filePath := fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, object) + filePath := fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), object) // Get ContentType from post formData // Otherwise from formFile ContentType diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 6941a1b9a..e9c51c711 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -130,7 +130,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) dirName = strings.TrimPrefix(dirName, "/") // Construct full directory path - fullDirPath := s3a.option.BucketsPath + "/" + bucket + fullDirPath := s3a.bucketDir(bucket) if dirName != "" { fullDirPath = fullDirPath + "/" + dirName } @@ -828,7 +828,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob glog.V(3).Infof("putSuspendedVersioningObject: START bucket=%s, object=%s, normalized=%s", bucket, object, normalizedObject) - bucketDir := s3a.option.BucketsPath + "/" + bucket + bucketDir := s3a.bucketDir(bucket) // Check if there's an existing null version in .versions directory and delete it // This ensures suspended versioning properly overwrites the null version as per S3 spec @@ -946,7 +946,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob // updateIsLatestFlagsForSuspendedVersioning sets IsLatest=false on all existing versions/delete markers // when a new "null" version becomes the latest during suspended versioning func (s3a *S3ApiServer) updateIsLatestFlagsForSuspendedVersioning(bucket, object string) error { - bucketDir := s3a.option.BucketsPath + "/" + bucket + bucketDir := s3a.bucketDir(bucket) versionsObjectPath := object + s3_constants.VersionsFolder versionsDir := bucketDir + "/" + versionsObjectPath @@ -1031,7 +1031,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin // We need to construct the object path relative to the bucket versionObjectPath := normalizedObject + s3_constants.VersionsFolder + "/" + versionFileName versionFilePath := s3a.toFilerPath(bucket, versionObjectPath) - bucketDir := s3a.option.BucketsPath + "/" + bucket + bucketDir := s3a.bucketDir(bucket) body := dataReader if objectContentType == "" { @@ -1112,7 +1112,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin // updateLatestVersionInDirectory updates the .versions directory metadata to indicate the latest version // versionEntry contains the metadata (size, ETag, mtime, owner) to cache for single-scan list efficiency func (s3a *S3ApiServer) updateLatestVersionInDirectory(bucket, object, versionId, versionFileName string, versionEntry *filer_pb.Entry) error { - bucketDir := s3a.option.BucketsPath + "/" + bucket + bucketDir := s3a.bucketDir(bucket) versionsObjectPath := object + s3_constants.VersionsFolder // Get the current .versions directory entry with retry logic for filer consistency diff --git a/weed/s3api/s3api_object_handlers_tagging.go b/weed/s3api/s3api_object_handlers_tagging.go index 647545254..f3ef4a73b 100644 --- a/weed/s3api/s3api_object_handlers_tagging.go +++ b/weed/s3api/s3api_object_handlers_tagging.go @@ -66,7 +66,7 @@ func (s3a *S3ApiServer) GetObjectTaggingHandler(w http.ResponseWriter, r *http.R } } else { // Handle regular (non-versioned) object tagging retrieval - target := util.FullPath(fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, object)) + target := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), object)) dir, name := target.DirAndName() tags, err := s3a.getTags(dir, name) @@ -170,7 +170,7 @@ func (s3a *S3ApiServer) PutObjectTaggingHandler(w http.ResponseWriter, r *http.R } } else { // Handle regular (non-versioned) object tagging modification - target := util.FullPath(fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, object)) + target := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), object)) dir, name := target.DirAndName() if err = s3a.setTags(dir, name, tags); err != nil { @@ -195,10 +195,10 @@ func (s3a *S3ApiServer) PutObjectTaggingHandler(w http.ResponseWriter, r *http.R // Specific version requested if versionId == "null" { // Null version (pre-versioning object) - stored as regular file - updateDirectory = s3a.option.BucketsPath + "/" + bucket + updateDirectory = s3a.bucketDir(bucket) } else { // Versioned object - stored in .versions directory - updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + s3_constants.VersionsFolder + updateDirectory = s3a.bucketDir(bucket) + "/" + object + s3_constants.VersionsFolder } } else { // Latest version in versioned bucket - could be null version or versioned object @@ -212,10 +212,10 @@ func (s3a *S3ApiServer) PutObjectTaggingHandler(w http.ResponseWriter, r *http.R if actualVersionId == "null" || actualVersionId == "" { // Null version (pre-versioning object) - stored as regular file - updateDirectory = s3a.option.BucketsPath + "/" + bucket + updateDirectory = s3a.bucketDir(bucket) } else { // Versioned object - stored in .versions directory - updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + s3_constants.VersionsFolder + updateDirectory = s3a.bucketDir(bucket) + "/" + object + s3_constants.VersionsFolder } } @@ -308,7 +308,7 @@ func (s3a *S3ApiServer) DeleteObjectTaggingHandler(w http.ResponseWriter, r *htt } } else { // Handle regular (non-versioned) object tagging deletion - target := util.FullPath(fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, object)) + target := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), object)) dir, name := target.DirAndName() err := s3a.rmTags(dir, name) @@ -334,10 +334,10 @@ func (s3a *S3ApiServer) DeleteObjectTaggingHandler(w http.ResponseWriter, r *htt // Specific version requested if versionId == "null" { // Null version (pre-versioning object) - stored as regular file - updateDirectory = s3a.option.BucketsPath + "/" + bucket + updateDirectory = s3a.bucketDir(bucket) } else { // Versioned object - stored in .versions directory - updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + s3_constants.VersionsFolder + updateDirectory = s3a.bucketDir(bucket) + "/" + object + s3_constants.VersionsFolder } } else { // Latest version in versioned bucket - could be null version or versioned object @@ -351,10 +351,10 @@ func (s3a *S3ApiServer) DeleteObjectTaggingHandler(w http.ResponseWriter, r *htt if actualVersionId == "null" || actualVersionId == "" { // Null version (pre-versioning object) - stored as regular file - updateDirectory = s3a.option.BucketsPath + "/" + bucket + updateDirectory = s3a.bucketDir(bucket) } else { // Versioned object - stored in .versions directory - updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + s3_constants.VersionsFolder + updateDirectory = s3a.bucketDir(bucket) + "/" + object + s3_constants.VersionsFolder } } diff --git a/weed/s3api/s3api_object_retention.go b/weed/s3api/s3api_object_retention.go index e4ef5b336..26996f5bd 100644 --- a/weed/s3api/s3api_object_retention.go +++ b/weed/s3api/s3api_object_retention.go @@ -361,7 +361,7 @@ func (s3a *S3ApiServer) setObjectRetention(bucket, object, versionId string, ret // that mkFile operations are typically serialized at the filer level, but // future implementations might consider using atomic update operations or // entry-level locking for complete safety. - bucketDir := s3a.option.BucketsPath + "/" + bucket + bucketDir := s3a.bucketDir(bucket) return s3a.mkFile(bucketDir, entryPath, entry.Chunks, func(updatedEntry *filer_pb.Entry) { updatedEntry.Extended = entry.Extended updatedEntry.WormEnforcedAtTsNs = entry.WormEnforcedAtTsNs @@ -453,7 +453,7 @@ func (s3a *S3ApiServer) setObjectLegalHold(bucket, object, versionId string, leg // that mkFile operations are typically serialized at the filer level, but // future implementations might consider using atomic update operations or // entry-level locking for complete safety. - bucketDir := s3a.option.BucketsPath + "/" + bucket + bucketDir := s3a.bucketDir(bucket) return s3a.mkFile(bucketDir, entryPath, entry.Chunks, func(updatedEntry *filer_pb.Entry) { updatedEntry.Extended = entry.Extended }) diff --git a/weed/s3api/s3api_object_versioning.go b/weed/s3api/s3api_object_versioning.go index c319ce4fe..10b909442 100644 --- a/weed/s3api/s3api_object_versioning.go +++ b/weed/s3api/s3api_object_versioning.go @@ -152,7 +152,7 @@ func (s3a *S3ApiServer) createDeleteMarker(bucket, object string) (string, error versionFileName := s3a.getVersionFileName(versionId) // Store delete marker in the .versions directory - bucketDir := s3a.option.BucketsPath + "/" + bucket + bucketDir := s3a.bucketDir(bucket) versionsDir := bucketDir + "/" + cleanObject + s3_constants.VersionsFolder // Create the delete marker entry in the .versions directory @@ -230,7 +230,7 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM // Recursively find all .versions directories in the bucket // Pass keyMarker and versionIdMarker to enable efficient pagination (skip entries before marker) - bucketPath := path.Join(s3a.option.BucketsPath, bucket) + bucketPath := s3a.bucketDir(bucket) // Memory optimization: limit collection to maxKeys+1 versions. // This works correctly for objects using the NEW inverted-timestamp format, where @@ -728,7 +728,7 @@ func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVe glog.V(2).Infof("getObjectVersionList: looking for versions of %s/%s in .versions directory", bucket, object) // All versions are now stored in the .versions directory only - bucketDir := s3a.option.BucketsPath + "/" + bucket + bucketDir := s3a.bucketDir(bucket) versionsObjectPath := object + s3_constants.VersionsFolder glog.V(2).Infof("getObjectVersionList: checking versions directory %s", versionsObjectPath) @@ -872,12 +872,12 @@ func (s3a *S3ApiServer) getSpecificObjectVersion(bucket, object, versionId strin if versionId == "" { // Get current version - return s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), normalizedObject) + return s3a.getEntry(s3a.bucketDir(bucket), normalizedObject) } if versionId == "null" { // "null" version ID refers to pre-versioning objects stored as regular files - bucketDir := s3a.option.BucketsPath + "/" + bucket + bucketDir := s3a.bucketDir(bucket) entry, err := s3a.getEntry(bucketDir, normalizedObject) if err != nil { return nil, fmt.Errorf("null version object %s not found: %v", normalizedObject, err) @@ -908,7 +908,7 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st if versionId == "null" { // Delete "null" version (pre-versioning object stored as regular file) - bucketDir := s3a.option.BucketsPath + "/" + bucket + bucketDir := s3a.bucketDir(bucket) // Check if the object exists _, err := s3a.getEntry(bucketDir, normalizedObject) @@ -935,7 +935,7 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st versionFile := s3a.getVersionFileName(versionId) // Check if this is the latest version before attempting deletion (for potential metadata update) - versionsEntry, dirErr := s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), normalizedObject+s3_constants.VersionsFolder) + versionsEntry, dirErr := s3a.getEntry(s3a.bucketDir(bucket), normalizedObject+s3_constants.VersionsFolder) isLatestVersion := false if dirErr == nil && versionsEntry.Extended != nil { if latestVersionIdBytes, hasLatest := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatest { @@ -972,7 +972,7 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st // updateLatestVersionAfterDeletion finds the new latest version after deleting the current latest func (s3a *S3ApiServer) updateLatestVersionAfterDeletion(bucket, object string) error { - bucketDir := s3a.option.BucketsPath + "/" + bucket + bucketDir := s3a.bucketDir(bucket) versionsObjectPath := object + s3_constants.VersionsFolder versionsDir := bucketDir + "/" + versionsObjectPath @@ -1118,7 +1118,7 @@ func (s3a *S3ApiServer) doGetLatestObjectVersion(bucket, object string, maxRetri // Normalize object path to ensure consistency with toFilerPath behavior normalizedObject := s3_constants.NormalizeObjectKey(object) - bucketDir := s3a.option.BucketsPath + "/" + bucket + bucketDir := s3a.bucketDir(bucket) versionsObjectPath := normalizedObject + s3_constants.VersionsFolder glog.V(1).Infof("doGetLatestObjectVersion: looking for latest version of %s/%s (normalized: %s, retries: %d)", bucket, object, normalizedObject, maxRetries) @@ -1306,7 +1306,7 @@ func (s3a *S3ApiServer) getLatestVersionEntryFromDirectoryEntry(bucket, object s glog.V(3).Infof("getLatestVersionEntryFromDirectoryEntry: fetching version file for %s/%s (no cached metadata)", bucket, normalizedObject) - bucketDir := path.Join(s3a.option.BucketsPath, bucket) + bucketDir := s3a.bucketDir(bucket) versionsObjectPath := path.Join(normalizedObject, s3_constants.VersionsFolder) latestVersionPath := path.Join(versionsObjectPath, latestVersionFile) latestVersionEntry, err := s3a.getEntry(bucketDir, latestVersionPath) diff --git a/weed/s3api/s3api_sosapi.go b/weed/s3api/s3api_sosapi.go index f12add728..53d7acdb4 100644 --- a/weed/s3api/s3api_sosapi.go +++ b/weed/s3api/s3api_sosapi.go @@ -153,7 +153,7 @@ func (s3a *S3ApiServer) getCapacityInfo(ctx context.Context, bucket string) (cap var quota int64 // getEntry communicates with filer, so errors here might mean filer connectivity issues or bucket not found // If bucket not found, we probably shouldn't be here (checked in handler), but safe to ignore - if entry, getErr := s3a.getEntry(s3a.option.BucketsPath, bucket); getErr == nil && entry != nil { + if entry, getErr := s3a.getBucketEntry(bucket); getErr == nil && entry != nil { quota = entry.Quota } diff --git a/weed/s3api/s3api_version_id.go b/weed/s3api/s3api_version_id.go index 5f74d36a8..94b359c90 100644 --- a/weed/s3api/s3api_version_id.go +++ b/weed/s3api/s3api_version_id.go @@ -141,7 +141,7 @@ func compareVersionIds(a, b string) int { // getVersionedObjectDir returns the directory path for storing object versions func (s3a *S3ApiServer) getVersionedObjectDir(bucket, object string) string { - return s3a.option.BucketsPath + "/" + bucket + "/" + object + s3_constants.VersionsFolder + return s3a.bucketDir(bucket) + "/" + object + s3_constants.VersionsFolder } // getVersionFileName returns the filename for a specific version @@ -154,7 +154,7 @@ func (s3a *S3ApiServer) getVersionFileName(versionId string) string { // For new .versions directories, returns true (use new format). // For existing directories, infers format from the latest version ID. func (s3a *S3ApiServer) getVersionIdFormat(bucket, object string) bool { - bucketDir := s3a.option.BucketsPath + "/" + bucket + bucketDir := s3a.bucketDir(bucket) versionsPath := object + s3_constants.VersionsFolder // Try to get the .versions directory entry diff --git a/weed/s3api/s3tables/filer_ops.go b/weed/s3api/s3tables/filer_ops.go index 08004b5cf..7edb8a2a5 100644 --- a/weed/s3api/s3tables/filer_ops.go +++ b/weed/s3api/s3tables/filer_ops.go @@ -20,7 +20,7 @@ var ( func (h *S3TablesHandler) createDirectory(ctx context.Context, client filer_pb.SeaweedFilerClient, path string) error { dir, name := splitPath(path) now := time.Now().Unix() - _, err := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{ + return filer_pb.CreateEntry(ctx, client, &filer_pb.CreateEntryRequest{ Directory: dir, Entry: &filer_pb.Entry{ Name: name, @@ -28,13 +28,74 @@ func (h *S3TablesHandler) createDirectory(ctx context.Context, client filer_pb.S Attributes: &filer_pb.FuseAttributes{ Mtime: now, Crtime: now, - FileMode: uint32(0755 | os.ModeDir), // Directory mode + FileMode: uint32(0755 | os.ModeDir), }, }, }) +} + +// ensureDirectory ensures a directory exists at the specified path +func (h *S3TablesHandler) ensureDirectory(ctx context.Context, client filer_pb.SeaweedFilerClient, path string) error { + dir, name := splitPath(path) + _, err := filer_pb.LookupEntry(ctx, client, &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, + Name: name, + }) + if err == nil { + return nil + } + if errors.Is(err, filer_pb.ErrNotFound) { + return h.createDirectory(ctx, client, path) + } return err } +// upsertFile creates or updates a small file with the given content +func (h *S3TablesHandler) upsertFile(ctx context.Context, client filer_pb.SeaweedFilerClient, path string, data []byte) error { + dir, name := splitPath(path) + now := time.Now().Unix() + resp, err := filer_pb.LookupEntry(ctx, client, &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, + Name: name, + }) + if err != nil { + if !errors.Is(err, filer_pb.ErrNotFound) { + return err + } + return filer_pb.CreateEntry(ctx, client, &filer_pb.CreateEntryRequest{ + Directory: dir, + Entry: &filer_pb.Entry{ + Name: name, + Content: data, + Attributes: &filer_pb.FuseAttributes{ + Mtime: now, + Crtime: now, + FileMode: uint32(0644), + FileSize: uint64(len(data)), + }, + }, + }) + } + + entry := resp.Entry + if entry.Attributes == nil { + entry.Attributes = &filer_pb.FuseAttributes{} + } + entry.Attributes.Mtime = now + entry.Attributes.FileSize = uint64(len(data)) + entry.Content = data + return filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{ + Directory: dir, + Entry: entry, + }) +} + +// deleteEntryIfExists removes an entry if it exists, ignoring missing errors +func (h *S3TablesHandler) deleteEntryIfExists(ctx context.Context, client filer_pb.SeaweedFilerClient, path string) error { + dir, name := splitPath(path) + return filer_pb.DoRemove(ctx, client, dir, name, true, false, true, false, nil) +} + // setExtendedAttribute sets an extended attribute on an existing entry func (h *S3TablesHandler) setExtendedAttribute(ctx context.Context, client filer_pb.SeaweedFilerClient, path, key string, data []byte) error { dir, name := splitPath(path) @@ -57,11 +118,10 @@ func (h *S3TablesHandler) setExtendedAttribute(ctx context.Context, client filer entry.Extended[key] = data // Save the updated entry - _, err = client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{ + return filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{ Directory: dir, Entry: entry, }) - return err } // getExtendedAttribute gets an extended attribute from an entry @@ -108,14 +168,14 @@ func (h *S3TablesHandler) deleteExtendedAttribute(ctx context.Context, client fi } // Save the updated entry - _, err = client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{ + return filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{ Directory: dir, Entry: entry, }) - return err } // deleteDirectory deletes a directory and all its contents +// Note: DeleteEntry RPC response doesn't have an Error field, so we only check the RPC err func (h *S3TablesHandler) deleteDirectory(ctx context.Context, client filer_pb.SeaweedFilerClient, path string) error { dir, name := splitPath(path) _, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{ diff --git a/weed/s3api/s3tables/handler.go b/weed/s3api/s3tables/handler.go index 39b10ce0e..da5bbd58d 100644 --- a/weed/s3api/s3tables/handler.go +++ b/weed/s3api/s3tables/handler.go @@ -16,14 +16,15 @@ import ( ) const ( - TablesPath = "/table-buckets" + TablesPath = s3_constants.DefaultBucketsPath DefaultAccountID = "000000000000" DefaultRegion = "us-east-1" // Extended entry attributes for metadata storage - ExtendedKeyMetadata = "s3tables.metadata" - ExtendedKeyPolicy = "s3tables.policy" - ExtendedKeyTags = "s3tables.tags" + ExtendedKeyTableBucket = "s3tables.tableBucket" + ExtendedKeyMetadata = "s3tables.metadata" + ExtendedKeyPolicy = "s3tables.policy" + ExtendedKeyTags = "s3tables.tags" // Maximum request body size (10MB) maxRequestBodySize = 10 * 1024 * 1024 diff --git a/weed/s3api/s3tables/handler_bucket_create.go b/weed/s3api/s3tables/handler_bucket_create.go index 1e953f47c..3b4cac7e4 100644 --- a/weed/s3api/s3tables/handler_bucket_create.go +++ b/weed/s3api/s3tables/handler_bucket_create.go @@ -38,16 +38,16 @@ func (h *S3TablesHandler) handleCreateTableBucket(w http.ResponseWriter, r *http // Check if bucket already exists and ensure no conflict with object store buckets tableBucketExists := false s3BucketExists := false + bucketsPath := s3_constants.DefaultBucketsPath err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(r.Context(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return err } - bucketsPath := resp.DirBuckets - if bucketsPath == "" { - bucketsPath = s3_constants.DefaultBucketsPath + if resp.DirBuckets != "" { + bucketsPath = resp.DirBuckets } - _, err = filer_pb.LookupEntry(r.Context(), client, &filer_pb.LookupDirectoryEntryRequest{ + entryResp, err := filer_pb.LookupEntry(r.Context(), client, &filer_pb.LookupDirectoryEntryRequest{ Directory: bucketsPath, Name: req.Name, }) @@ -55,20 +55,15 @@ func (h *S3TablesHandler) handleCreateTableBucket(w http.ResponseWriter, r *http if !errors.Is(err, filer_pb.ErrNotFound) { return err } - } else { - s3BucketExists = true + return nil } - _, err = filer_pb.LookupEntry(r.Context(), client, &filer_pb.LookupDirectoryEntryRequest{ - Directory: TablesPath, - Name: req.Name, - }) - if err != nil { - if errors.Is(err, filer_pb.ErrNotFound) { - return nil + if entryResp != nil && entryResp.Entry != nil { + if IsTableBucketEntry(entryResp.Entry) { + tableBucketExists = true + } else { + s3BucketExists = true } - return err } - tableBucketExists = true return nil }) @@ -78,15 +73,14 @@ func (h *S3TablesHandler) handleCreateTableBucket(w http.ResponseWriter, r *http return err } - if s3BucketExists { - h.writeError(w, http.StatusConflict, ErrCodeBucketAlreadyExists, fmt.Sprintf("bucket name %s is already used by an object store bucket", req.Name)) - return fmt.Errorf("bucket name conflicts with object store bucket") - } - if tableBucketExists { h.writeError(w, http.StatusConflict, ErrCodeBucketAlreadyExists, fmt.Sprintf("table bucket %s already exists", req.Name)) return fmt.Errorf("bucket already exists") } + if s3BucketExists { + h.writeError(w, http.StatusConflict, ErrCodeBucketAlreadyExists, fmt.Sprintf("bucket %s already exists", req.Name)) + return fmt.Errorf("bucket already exists") + } // Create the bucket directory and set metadata as extended attributes now := time.Now() @@ -111,11 +105,24 @@ func (h *S3TablesHandler) handleCreateTableBucket(w http.ResponseWriter, r *http } } + // Ensure object root directory exists for table bucket S3 operations + if err := h.ensureDirectory(r.Context(), client, GetTableObjectRootDir()); err != nil { + return fmt.Errorf("failed to create table object root directory: %w", err) + } + if err := h.ensureDirectory(r.Context(), client, GetTableObjectBucketPath(req.Name)); err != nil { + return fmt.Errorf("failed to create table object bucket directory: %w", err) + } + // Create bucket directory if err := h.createDirectory(r.Context(), client, bucketPath); err != nil { return err } + // Mark as a table bucket + if err := h.setExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyTableBucket, []byte("true")); err != nil { + return err + } + // Set metadata as extended attribute if err := h.setExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyMetadata, metadataBytes); err != nil { return err diff --git a/weed/s3api/s3tables/handler_bucket_get_list_delete.go b/weed/s3api/s3tables/handler_bucket_get_list_delete.go index 7399c0ae3..e08d4a86a 100644 --- a/weed/s3api/s3tables/handler_bucket_get_list_delete.go +++ b/weed/s3api/s3tables/handler_bucket_get_list_delete.go @@ -8,6 +8,7 @@ import ( "net/http" "strings" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" ) @@ -166,6 +167,10 @@ func (h *S3TablesHandler) handleListTableBuckets(w http.ResponseWriter, r *http. continue } + if !IsTableBucketEntry(entry.Entry) { + continue + } + // Read metadata from extended attribute data, ok := entry.Entry.Extended[ExtendedKeyMetadata] if !ok { @@ -343,7 +348,22 @@ func (h *S3TablesHandler) handleDeleteTableBucket(w http.ResponseWriter, r *http // Delete the bucket err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - return h.deleteDirectory(r.Context(), client, bucketPath) + // Delete table object entry first, then directory + // This ensures we clean up the leaf entry even if directory deletion fails + tableObjErr := h.deleteEntryIfExists(r.Context(), client, GetTableObjectBucketPath(bucketName)) + dirErr := h.deleteDirectory(r.Context(), client, bucketPath) + + // Log any errors but don't fail if one succeeds + if tableObjErr != nil && dirErr != nil { + return fmt.Errorf("delete table object failed: %w, delete directory failed: %w", tableObjErr, dirErr) + } + if tableObjErr != nil { + glog.V(1).Infof("failed to delete table object for %s: %v", bucketName, tableObjErr) + } + if dirErr != nil { + glog.V(1).Infof("failed to delete table bucket dir for %s: %v", bucketName, dirErr) + } + return nil }) if err != nil { diff --git a/weed/s3api/s3tables/handler_policy.go b/weed/s3api/s3tables/handler_policy.go index 68aa22582..c62e72aaa 100644 --- a/weed/s3api/s3tables/handler_policy.go +++ b/weed/s3api/s3tables/handler_policy.go @@ -18,7 +18,7 @@ func (h *S3TablesHandler) extractResourceOwnerAndBucket( resourcePath string, rType ResourceType, ) (ownerAccountID, bucketName string, err error) { - // Extract bucket name from resource path (format: /table-buckets/{bucket}/... for both tables and buckets) + // Extract bucket name from resource path (format: /buckets/{bucket}/... for both tables and buckets) parts := strings.Split(strings.Trim(resourcePath, "/"), "/") if len(parts) >= 2 { bucketName = parts[1] diff --git a/weed/s3api/s3tables/handler_table.go b/weed/s3api/s3tables/handler_table.go index 10e0df530..b1ae0f8e1 100644 --- a/weed/s3api/s3tables/handler_table.go +++ b/weed/s3api/s3tables/handler_table.go @@ -1,6 +1,7 @@ package s3tables import ( + "context" "encoding/json" "errors" "fmt" @@ -227,6 +228,10 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque } } + if err := h.updateTableLocationMapping(r.Context(), client, "", req.MetadataLocation, tablePath); err != nil { + glog.V(1).Infof("failed to update table location mapping for %s: %v", req.MetadataLocation, err) + } + return nil }) @@ -909,7 +914,13 @@ func (h *S3TablesHandler) handleDeleteTable(w http.ResponseWriter, r *http.Reque // Delete the table err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - return h.deleteDirectory(r.Context(), client, tablePath) + if err := h.deleteDirectory(r.Context(), client, tablePath); err != nil { + return err + } + if err := h.deleteTableLocationMapping(r.Context(), client, metadata.MetadataLocation); err != nil { + glog.V(1).Infof("failed to delete table location mapping for %s: %v", metadata.MetadataLocation, err) + } + return nil }) if err != nil { @@ -1051,6 +1062,9 @@ func (h *S3TablesHandler) handleUpdateTable(w http.ResponseWriter, r *http.Reque return ErrVersionTokenMismatch } + // Capture old metadata location before mutation for stale mapping cleanup + oldMetadataLocation := metadata.MetadataLocation + // Update metadata if req.Metadata != nil { if metadata.Metadata == nil { @@ -1086,7 +1100,13 @@ func (h *S3TablesHandler) handleUpdateTable(w http.ResponseWriter, r *http.Reque } err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - return h.setExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata, metadataBytes) + if err := h.setExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata, metadataBytes); err != nil { + return err + } + if err := h.updateTableLocationMapping(r.Context(), client, oldMetadataLocation, metadata.MetadataLocation, tablePath); err != nil { + glog.V(1).Infof("failed to update table location mapping for %s -> %s: %v", oldMetadataLocation, metadata.MetadataLocation, err) + } + return nil }) if err != nil { @@ -1101,3 +1121,35 @@ func (h *S3TablesHandler) handleUpdateTable(w http.ResponseWriter, r *http.Reque }) return nil } + +func (h *S3TablesHandler) updateTableLocationMapping(ctx context.Context, client filer_pb.SeaweedFilerClient, oldMetadataLocation, newMetadataLocation, tablePath string) error { + newTableLocationBucket, ok := parseTableLocationBucket(newMetadataLocation) + if !ok { + return nil + } + + if err := h.ensureDirectory(ctx, client, GetTableLocationMappingDir()); err != nil { + return err + } + + // If the metadata location changed, delete the stale mapping for the old bucket + if oldMetadataLocation != "" && oldMetadataLocation != newMetadataLocation { + oldTableLocationBucket, ok := parseTableLocationBucket(oldMetadataLocation) + if ok && oldTableLocationBucket != newTableLocationBucket { + oldMappingPath := GetTableLocationMappingPath(oldTableLocationBucket) + if err := h.deleteEntryIfExists(ctx, client, oldMappingPath); err != nil { + glog.V(1).Infof("failed to delete stale mapping for %s: %v", oldTableLocationBucket, err) + } + } + } + + return h.upsertFile(ctx, client, GetTableLocationMappingPath(newTableLocationBucket), []byte(tablePath)) +} + +func (h *S3TablesHandler) deleteTableLocationMapping(ctx context.Context, client filer_pb.SeaweedFilerClient, metadataLocation string) error { + tableLocationBucket, ok := parseTableLocationBucket(metadataLocation) + if !ok { + return nil + } + return h.deleteEntryIfExists(ctx, client, GetTableLocationMappingPath(tableLocationBucket)) +} diff --git a/weed/s3api/s3tables/iceberg_layout.go b/weed/s3api/s3tables/iceberg_layout.go index 0385bc7f2..6fdccf134 100644 --- a/weed/s3api/s3tables/iceberg_layout.go +++ b/weed/s3api/s3tables/iceberg_layout.go @@ -254,7 +254,7 @@ func NewTableBucketFileValidator() *TableBucketFileValidator { } // ValidateTableBucketUpload checks if a file upload to a table bucket conforms to Iceberg layout -// fullPath is the complete filer path (e.g., /table-buckets/mybucket/mynamespace/mytable/data/file.parquet) +// fullPath is the complete filer path (e.g., /buckets/mybucket/mynamespace/mytable/data/file.parquet) // Returns nil if the path is not a table bucket path or if validation passes // Returns an error if the file doesn't conform to Iceberg layout func (v *TableBucketFileValidator) ValidateTableBucketUpload(fullPath string) error { @@ -264,7 +264,7 @@ func (v *TableBucketFileValidator) ValidateTableBucketUpload(fullPath string) er } // Extract the path relative to table bucket root - // Format: /table-buckets/{bucket}/{namespace}/{table}/{relative-path} + // Format: /buckets/{bucket}/{namespace}/{table}/{relative-path} relativePath := strings.TrimPrefix(fullPath, TablesPath+"/") parts := strings.SplitN(relativePath, "/", 4) @@ -307,7 +307,7 @@ func (v *TableBucketFileValidator) ValidateTableBucketUpload(fullPath string) er return v.layoutValidator.ValidateFilePath(tableRelativePath) } -// IsTableBucketPath checks if a path is under the table-buckets directory +// IsTableBucketPath checks if a path is under the table buckets directory func IsTableBucketPath(fullPath string) bool { return strings.HasPrefix(fullPath, TablesPath+"/") } @@ -341,11 +341,6 @@ func (v *TableBucketFileValidator) ValidateTableBucketUploadWithClient( client filer_pb.SeaweedFilerClient, fullPath string, ) error { - // First check basic layout - if err := v.ValidateTableBucketUpload(fullPath); err != nil { - return err - } - // If not a table bucket path, nothing more to check if !IsTableBucketPath(fullPath) { return nil @@ -357,11 +352,37 @@ func (v *TableBucketFileValidator) ValidateTableBucketUploadWithClient( return nil // Not deep enough to need validation } + if strings.HasPrefix(bucket, ".") { + return nil + } + + resp, err := filer_pb.LookupEntry(ctx, client, &filer_pb.LookupDirectoryEntryRequest{ + Directory: TablesPath, + Name: bucket, + }) + if err != nil { + if errors.Is(err, filer_pb.ErrNotFound) { + return nil + } + return &IcebergLayoutError{ + Code: ErrCodeInvalidIcebergLayout, + Message: "failed to verify table bucket: " + err.Error(), + } + } + if resp == nil || !IsTableBucketEntry(resp.Entry) { + return nil + } + + // Now check basic layout once we know this is a table bucket path. + if err := v.ValidateTableBucketUpload(fullPath); err != nil { + return err + } + // Verify the table exists and has ICEBERG format by checking its metadata tablePath := GetTablePath(bucket, namespace, table) dir, name := splitPath(tablePath) - resp, err := filer_pb.LookupEntry(ctx, client, &filer_pb.LookupDirectoryEntryRequest{ + resp, err = filer_pb.LookupEntry(ctx, client, &filer_pb.LookupDirectoryEntryRequest{ Directory: dir, Name: name, }) diff --git a/weed/s3api/s3tables/iceberg_layout_test.go b/weed/s3api/s3tables/iceberg_layout_test.go index 3bb871491..2a1f81130 100644 --- a/weed/s3api/s3tables/iceberg_layout_test.go +++ b/weed/s3api/s3tables/iceberg_layout_test.go @@ -104,28 +104,28 @@ func TestTableBucketFileValidator_ValidateTableBucketUpload(t *testing.T) { {"filer path", "/home/user/file.txt", false}, // Table bucket structure paths (creating directories) - {"table bucket root", "/table-buckets/mybucket", false}, - {"namespace dir", "/table-buckets/mybucket/myns", false}, - {"table dir", "/table-buckets/mybucket/myns/mytable", false}, - {"table dir trailing slash", "/table-buckets/mybucket/myns/mytable/", false}, + {"table bucket root", "/buckets/mybucket", false}, + {"namespace dir", "/buckets/mybucket/myns", false}, + {"table dir", "/buckets/mybucket/myns/mytable", false}, + {"table dir trailing slash", "/buckets/mybucket/myns/mytable/", false}, // Valid table bucket file uploads - {"valid parquet upload", "/table-buckets/mybucket/myns/mytable/data/file.parquet", false}, - {"valid metadata upload", "/table-buckets/mybucket/myns/mytable/metadata/v1.metadata.json", false}, - {"valid partitioned data", "/table-buckets/mybucket/myns/mytable/data/year=2024/file.parquet", false}, + {"valid parquet upload", "/buckets/mybucket/myns/mytable/data/file.parquet", false}, + {"valid metadata upload", "/buckets/mybucket/myns/mytable/metadata/v1.metadata.json", false}, + {"valid partitioned data", "/buckets/mybucket/myns/mytable/data/year=2024/file.parquet", false}, // Invalid table bucket file uploads - {"invalid file type", "/table-buckets/mybucket/myns/mytable/data/file.csv", true}, - {"invalid top-level dir", "/table-buckets/mybucket/myns/mytable/invalid/file.parquet", true}, - {"root file in table", "/table-buckets/mybucket/myns/mytable/file.parquet", true}, + {"invalid file type", "/buckets/mybucket/myns/mytable/data/file.csv", true}, + {"invalid top-level dir", "/buckets/mybucket/myns/mytable/invalid/file.parquet", true}, + {"root file in table", "/buckets/mybucket/myns/mytable/file.parquet", true}, // Empty segment cases - {"empty bucket", "/table-buckets//myns/mytable/data/file.parquet", true}, - {"empty namespace", "/table-buckets/mybucket//mytable/data/file.parquet", true}, - {"empty table", "/table-buckets/mybucket/myns//data/file.parquet", true}, - {"empty bucket dir", "/table-buckets//", true}, - {"empty namespace dir", "/table-buckets/mybucket//", true}, - {"table double slash bypass", "/table-buckets/mybucket/myns/mytable//data/file.parquet", true}, + {"empty bucket", "/buckets//myns/mytable/data/file.parquet", true}, + {"empty namespace", "/buckets/mybucket//mytable/data/file.parquet", true}, + {"empty table", "/buckets/mybucket/myns//data/file.parquet", true}, + {"empty bucket dir", "/buckets//", true}, + {"empty namespace dir", "/buckets/mybucket//", true}, + {"table double slash bypass", "/buckets/mybucket/myns/mytable//data/file.parquet", true}, } for _, tt := range tests { @@ -143,11 +143,10 @@ func TestIsTableBucketPath(t *testing.T) { path string want bool }{ - {"/table-buckets/mybucket", true}, - {"/table-buckets/mybucket/ns/table/data/file.parquet", true}, - {"/buckets/mybucket", false}, + {"/buckets/mybucket", true}, + {"/buckets/mybucket/ns/table/data/file.parquet", true}, {"/home/user/file.txt", false}, - {"table-buckets/mybucket", false}, // missing leading slash + {"buckets/mybucket", false}, // missing leading slash } for _, tt := range tests { @@ -166,11 +165,11 @@ func TestGetTableInfoFromPath(t *testing.T) { wantNamespace string wantTable string }{ - {"/table-buckets/mybucket/myns/mytable/data/file.parquet", "mybucket", "myns", "mytable"}, - {"/table-buckets/mybucket/myns/mytable", "mybucket", "myns", "mytable"}, - {"/table-buckets/mybucket/myns", "mybucket", "myns", ""}, - {"/table-buckets/mybucket", "mybucket", "", ""}, - {"/buckets/mybucket", "", "", ""}, + {"/buckets/mybucket/myns/mytable/data/file.parquet", "mybucket", "myns", "mytable"}, + {"/buckets/mybucket/myns/mytable", "mybucket", "myns", "mytable"}, + {"/buckets/mybucket/myns", "mybucket", "myns", ""}, + {"/buckets/mybucket", "mybucket", "", ""}, + {"/home/user/file.txt", "", "", ""}, } for _, tt := range tests { diff --git a/weed/s3api/s3tables/utils.go b/weed/s3api/s3tables/utils.go index 0fd78f0c1..f6be5f530 100644 --- a/weed/s3api/s3tables/utils.go +++ b/weed/s3api/s3tables/utils.go @@ -9,6 +9,8 @@ import ( "regexp" "strings" "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" ) const ( @@ -17,6 +19,11 @@ const ( tableNamePatternStr = `[a-z0-9_]+` ) +const ( + tableLocationMappingsDirName = ".table-location-mappings" + tableObjectRootDirName = ".objects" +) + var ( bucketARNPattern = regexp.MustCompile(`^arn:aws:s3tables:[^:]*:[^:]*:bucket/(` + bucketNamePatternStr + `)$`) tableARNPattern = regexp.MustCompile(`^arn:aws:s3tables:[^:]*:[^:]*:bucket/(` + bucketNamePatternStr + `)/table/(` + tableNamespacePatternStr + `)/(` + tableNamePatternStr + `)$`) @@ -94,6 +101,26 @@ func GetTablePath(bucketName, namespace, tableName string) string { return path.Join(TablesPath, bucketName, namespace, tableName) } +// GetTableObjectRootDir returns the root path for table bucket object storage +func GetTableObjectRootDir() string { + return path.Join(TablesPath, tableObjectRootDirName) +} + +// GetTableObjectBucketPath returns the filer path for table bucket object storage +func GetTableObjectBucketPath(bucketName string) string { + return path.Join(GetTableObjectRootDir(), bucketName) +} + +// GetTableLocationMappingDir returns the root path for table location bucket mappings +func GetTableLocationMappingDir() string { + return path.Join(TablesPath, tableLocationMappingsDirName) +} + +// GetTableLocationMappingPath returns the filer path for a table location bucket mapping +func GetTableLocationMappingPath(tableLocationBucket string) string { + return path.Join(GetTableLocationMappingDir(), tableLocationBucket) +} + // Metadata structures type tableBucketMetadata struct { @@ -123,6 +150,15 @@ type tableMetadataInternal struct { Metadata *TableMetadata `json:"metadata,omitempty"` } +// IsTableBucketEntry returns true when the entry is marked as a table bucket. +func IsTableBucketEntry(entry *filer_pb.Entry) bool { + if entry == nil || entry.Extended == nil { + return false + } + _, ok := entry.Extended[ExtendedKeyTableBucket] + return ok +} + // Utility functions // validateBucketName validates bucket name and returns an error if invalid. @@ -182,6 +218,22 @@ func ValidateBucketName(name string) error { return validateBucketName(name) } +func parseTableLocationBucket(metadataLocation string) (string, bool) { + if !strings.HasPrefix(metadataLocation, "s3://") { + return "", false + } + trimmed := strings.TrimPrefix(metadataLocation, "s3://") + trimmed = strings.TrimSuffix(trimmed, "/") + if trimmed == "" { + return "", false + } + bucket, _, _ := strings.Cut(trimmed, "/") + if bucket == "" || !strings.HasSuffix(bucket, "--table-s3") { + return "", false + } + return bucket, true +} + // BuildBucketARN builds a bucket ARN with the provided region and account ID. // If region is empty, the ARN will omit the region field. func BuildBucketARN(region, accountID, bucketName string) (string, error) {