From 33da87452be51d7f56c5edacd232ac75b0468467 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 28 Jan 2026 10:04:27 -0800 Subject: [PATCH] Refine S3 Tables implementation to address code review feedback - Standardize namespace representation to []string - Improve listing logic with pagination and StartFromFileName - Enhance error handling with sentinel errors and robust checks - Add JSON encoding error logging - Fix CI workflow to use gofmt -l - Standardize timestamps in directory creation - Validate single-level namespaces --- .github/workflows/s3-tables-tests.yml | 12 +- test/s3tables/client.go | 12 +- weed/s3api/s3tables/filer_ops.go | 19 ++- weed/s3api/s3tables/handler.go | 8 +- .../handler_bucket_get_list_delete.go | 95 +++++++----- weed/s3api/s3tables/handler_namespace.go | 132 ++++++++++------- weed/s3api/s3tables/handler_policy.go | 36 +++-- weed/s3api/s3tables/handler_table.go | 140 ++++++++++++------ weed/s3api/s3tables/types.go | 56 +++---- weed/s3api/s3tables/utils.go | 24 +++ 10 files changed, 336 insertions(+), 198 deletions(-) diff --git a/.github/workflows/s3-tables-tests.yml b/.github/workflows/s3-tables-tests.yml index 131d0c2aa..7d453962b 100644 --- a/.github/workflows/s3-tables-tests.yml +++ b/.github/workflows/s3-tables-tests.yml @@ -133,10 +133,10 @@ jobs: run: | set -x echo "=== Checking S3 Tables Go format ===" - unformatted=$(go fmt ./weed/s3api/s3tables/... 2>&1 | wc -l) - if [ "$unformatted" -gt 0 ]; then + unformatted=$(gofmt -l ./weed/s3api/s3tables) + if [ -n "$unformatted" ]; then echo "Go format check failed - files need formatting" - go fmt ./weed/s3api/s3tables/... + echo "$unformatted" exit 1 fi echo "All S3 Tables files are properly formatted" @@ -145,10 +145,10 @@ jobs: run: | set -x echo "=== Checking S3 Tables test format ===" - unformatted=$(go fmt ./test/s3tables/... 2>&1 | wc -l) - if [ "$unformatted" -gt 0 ]; then + unformatted=$(gofmt -l ./test/s3tables) + if [ -n "$unformatted" ]; then echo "Go format check failed for tests" - go fmt ./test/s3tables/... + echo "$unformatted" exit 1 fi echo "All S3 Tables test files are properly formatted" diff --git a/test/s3tables/client.go b/test/s3tables/client.go index 5f543cdb2..6c3a32278 100644 --- a/test/s3tables/client.go +++ b/test/s3tables/client.go @@ -173,7 +173,7 @@ func (c *S3TablesClient) CreateNamespace(bucketARN string, namespace []string) ( func (c *S3TablesClient) GetNamespace(bucketARN, namespace string) (*s3tables.GetNamespaceResponse, error) { req := &s3tables.GetNamespaceRequest{ TableBucketARN: bucketARN, - Namespace: namespace, + Namespace: []string{namespace}, } resp, err := c.doRequest("GetNamespace", req) @@ -225,7 +225,7 @@ func (c *S3TablesClient) ListNamespaces(bucketARN, prefix string) (*s3tables.Lis func (c *S3TablesClient) DeleteNamespace(bucketARN, namespace string) error { req := &s3tables.DeleteNamespaceRequest{ TableBucketARN: bucketARN, - Namespace: namespace, + Namespace: []string{namespace}, } resp, err := c.doRequest("DeleteNamespace", req) @@ -248,7 +248,7 @@ func (c *S3TablesClient) DeleteNamespace(bucketARN, namespace string) error { func (c *S3TablesClient) CreateTable(bucketARN, namespace, name, format string, metadata *s3tables.TableMetadata, tags map[string]string) (*s3tables.CreateTableResponse, error) { req := &s3tables.CreateTableRequest{ TableBucketARN: bucketARN, - Namespace: namespace, + Namespace: []string{namespace}, Name: name, Format: format, Metadata: metadata, @@ -278,7 +278,7 @@ func (c *S3TablesClient) CreateTable(bucketARN, namespace, name, format string, func (c *S3TablesClient) GetTable(bucketARN, namespace, name string) (*s3tables.GetTableResponse, error) { req := &s3tables.GetTableRequest{ TableBucketARN: bucketARN, - Namespace: namespace, + Namespace: []string{namespace}, Name: name, } @@ -305,7 +305,7 @@ func (c *S3TablesClient) GetTable(bucketARN, namespace, name string) (*s3tables. func (c *S3TablesClient) ListTables(bucketARN, namespace, prefix string) (*s3tables.ListTablesResponse, error) { req := &s3tables.ListTablesRequest{ TableBucketARN: bucketARN, - Namespace: namespace, + Namespace: []string{namespace}, Prefix: prefix, } @@ -332,7 +332,7 @@ func (c *S3TablesClient) ListTables(bucketARN, namespace, prefix string) (*s3tab func (c *S3TablesClient) DeleteTable(bucketARN, namespace, name string) error { req := &s3tables.DeleteTableRequest{ TableBucketARN: bucketARN, - Namespace: namespace, + Namespace: []string{namespace}, Name: name, } diff --git a/weed/s3api/s3tables/filer_ops.go b/weed/s3api/s3tables/filer_ops.go index b512ba8c9..393d7294e 100644 --- a/weed/s3api/s3tables/filer_ops.go +++ b/weed/s3api/s3tables/filer_ops.go @@ -2,25 +2,32 @@ package s3tables import ( "context" + "errors" "fmt" "time" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" ) +var ( + ErrNotFound = errors.New("entry not found") + ErrAttributeNotFound = errors.New("attribute not found") +) + // Filer operations - Common functions for interacting with the filer // createDirectory creates a new directory at the specified path func (h *S3TablesHandler) createDirectory(ctx context.Context, client filer_pb.SeaweedFilerClient, path string) error { dir, name := splitPath(path) + now := time.Now().Unix() _, err := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{ Directory: dir, Entry: &filer_pb.Entry{ Name: name, IsDirectory: true, Attributes: &filer_pb.FuseAttributes{ - Mtime: time.Now().Unix(), - Crtime: time.Now().Unix(), + Mtime: now, + Crtime: now, FileMode: uint32(0755 | 1<<31), // Directory mode }, }, @@ -43,7 +50,7 @@ func (h *S3TablesHandler) setExtendedAttribute(ctx context.Context, client filer entry := resp.Entry if entry == nil { - return fmt.Errorf("entry not found: %s", path) + return fmt.Errorf("%w: %s", ErrNotFound, path) } // Update the extended attributes @@ -72,12 +79,12 @@ func (h *S3TablesHandler) getExtendedAttribute(ctx context.Context, client filer } if resp.Entry == nil { - return nil, fmt.Errorf("entry not found: %s", path) + return nil, fmt.Errorf("%w: %s", ErrNotFound, path) } data, ok := resp.Entry.Extended[key] if !ok { - return nil, fmt.Errorf("attribute not found: %s", key) + return nil, fmt.Errorf("%w: %s", ErrAttributeNotFound, key) } return data, nil @@ -98,7 +105,7 @@ func (h *S3TablesHandler) deleteExtendedAttribute(ctx context.Context, client fi entry := resp.Entry if entry == nil { - return fmt.Errorf("entry not found: %s", path) + return fmt.Errorf("%w: %s", ErrNotFound, path) } // Remove the extended attribute diff --git a/weed/s3api/s3tables/handler.go b/weed/s3api/s3tables/handler.go index dd565b1d5..88b5d9148 100644 --- a/weed/s3api/s3tables/handler.go +++ b/weed/s3api/s3tables/handler.go @@ -178,7 +178,9 @@ func (h *S3TablesHandler) writeJSON(w http.ResponseWriter, status int, data inte w.Header().Set("Content-Type", "application/x-amz-json-1.1") w.WriteHeader(status) if data != nil { - json.NewEncoder(w).Encode(data) + if err := json.NewEncoder(w).Encode(data); err != nil { + glog.Errorf("S3Tables: failed to encode response: %v", err) + } } } @@ -189,7 +191,9 @@ func (h *S3TablesHandler) writeError(w http.ResponseWriter, status int, code, me "__type": code, "message": message, } - json.NewEncoder(w).Encode(errorResponse) + if err := json.NewEncoder(w).Encode(errorResponse); err != nil { + glog.Errorf("S3Tables: failed to encode error response: %v", err) + } } // ARN generation helpers diff --git a/weed/s3api/s3tables/handler_bucket_get_list_delete.go b/weed/s3api/s3tables/handler_bucket_get_list_delete.go index 6ec25bd1a..24d9e6199 100644 --- a/weed/s3api/s3tables/handler_bucket_get_list_delete.go +++ b/weed/s3api/s3tables/handler_bucket_get_list_delete.go @@ -2,6 +2,7 @@ package s3tables import ( "encoding/json" + "errors" "fmt" "net/http" "strings" @@ -77,51 +78,67 @@ func (h *S3TablesHandler) handleListTableBuckets(w http.ResponseWriter, r *http. var buckets []TableBucketSummary + var lastFileName string err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.ListEntries(r.Context(), &filer_pb.ListEntriesRequest{ - Directory: TablesPath, - Limit: uint32(maxBuckets), - }) - if err != nil { - return err - } - - for { - entry, err := resp.Recv() + for len(buckets) < maxBuckets { + resp, err := client.ListEntries(r.Context(), &filer_pb.ListEntriesRequest{ + Directory: TablesPath, + Limit: uint32(maxBuckets * 2), // Fetch more than needed to account for filtering + StartFromFileName: lastFileName, + InclusiveStartFrom: lastFileName != "", + }) if err != nil { - break + return err } - if entry.Entry == nil || !entry.Entry.IsDirectory { - continue + hasMore := false + for { + entry, respErr := resp.Recv() + if respErr != nil { + break + } + hasMore = true + lastFileName = entry.Entry.Name + + if !entry.Entry.IsDirectory { + continue + } + + // Skip entries starting with "." + if strings.HasPrefix(entry.Entry.Name, ".") { + continue + } + + // Apply prefix filter + if req.Prefix != "" && !strings.HasPrefix(entry.Entry.Name, req.Prefix) { + continue + } + + // Read metadata from extended attribute + data, ok := entry.Entry.Extended[ExtendedKeyMetadata] + if !ok { + continue + } + + var metadata tableBucketMetadata + if err := json.Unmarshal(data, &metadata); err != nil { + continue + } + + buckets = append(buckets, TableBucketSummary{ + ARN: h.generateTableBucketARN(entry.Entry.Name), + Name: entry.Entry.Name, + CreatedAt: metadata.CreatedAt, + }) + + if len(buckets) >= maxBuckets { + return nil + } } - // Skip entries starting with "." - if strings.HasPrefix(entry.Entry.Name, ".") { - continue - } - - // Apply prefix filter - if req.Prefix != "" && !strings.HasPrefix(entry.Entry.Name, req.Prefix) { - continue - } - - // Read metadata from extended attribute - data, ok := entry.Entry.Extended[ExtendedKeyMetadata] - if !ok { - continue - } - - var metadata tableBucketMetadata - if err := json.Unmarshal(data, &metadata); err != nil { - continue + if !hasMore { + break } - - buckets = append(buckets, TableBucketSummary{ - ARN: h.generateTableBucketARN(entry.Entry.Name), - Name: entry.Entry.Name, - CreatedAt: metadata.CreatedAt, - }) } return nil @@ -129,7 +146,7 @@ func (h *S3TablesHandler) handleListTableBuckets(w http.ResponseWriter, r *http. if err != nil { // Check if it's a "not found" error - return empty list in that case - if strings.Contains(err.Error(), "no entry is found") || strings.Contains(err.Error(), "not found") { + if errors.Is(err, ErrNotFound) { buckets = []TableBucketSummary{} } else { // For other errors, return error response diff --git a/weed/s3api/s3tables/handler_namespace.go b/weed/s3api/s3tables/handler_namespace.go index f4c2a6078..df92bf9dd 100644 --- a/weed/s3api/s3tables/handler_namespace.go +++ b/weed/s3api/s3tables/handler_namespace.go @@ -34,13 +34,10 @@ func (h *S3TablesHandler) handleCreateNamespace(w http.ResponseWriter, r *http.R return err } - // For simplicity, use the first namespace element as the directory name - namespaceName := req.Namespace[0] - - // Validate namespace name - if len(namespaceName) < 1 || len(namespaceName) > 255 { - h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "namespace name must be between 1 and 255 characters") - return fmt.Errorf("invalid namespace name length") + namespaceName, err := validateNamespace(req.Namespace) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err } // Check if table bucket exists @@ -122,9 +119,15 @@ func (h *S3TablesHandler) handleGetNamespace(w http.ResponseWriter, r *http.Requ return err } - if req.TableBucketARN == "" || req.Namespace == "" { - h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN and namespace are required") - return fmt.Errorf("tableBucketARN and namespace are required") + if req.TableBucketARN == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required") + return fmt.Errorf("tableBucketARN is required") + } + + namespaceName, err := validateNamespace(req.Namespace) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err } bucketName, err := parseBucketNameFromARN(req.TableBucketARN) @@ -133,7 +136,7 @@ func (h *S3TablesHandler) handleGetNamespace(w http.ResponseWriter, r *http.Requ return err } - namespacePath := getNamespacePath(bucketName, req.Namespace) + namespacePath := getNamespacePath(bucketName, namespaceName) // Get namespace var metadata namespaceMetadata @@ -146,7 +149,7 @@ func (h *S3TablesHandler) handleGetNamespace(w http.ResponseWriter, r *http.Requ }) if err != nil { - h.writeError(w, http.StatusNotFound, ErrCodeNoSuchNamespace, fmt.Sprintf("namespace %s not found", req.Namespace)) + h.writeError(w, http.StatusNotFound, ErrCodeNoSuchNamespace, fmt.Sprintf("namespace %s not found", flattenNamespace(req.Namespace))) return err } @@ -187,51 +190,66 @@ func (h *S3TablesHandler) handleListNamespaces(w http.ResponseWriter, r *http.Re bucketPath := getTableBucketPath(bucketName) var namespaces []NamespaceSummary - // List namespaces + var lastFileName string err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.ListEntries(r.Context(), &filer_pb.ListEntriesRequest{ - Directory: bucketPath, - Limit: uint32(maxNamespaces), - }) - if err != nil { - return err - } - - for { - entry, err := resp.Recv() + for len(namespaces) < maxNamespaces { + resp, err := client.ListEntries(r.Context(), &filer_pb.ListEntriesRequest{ + Directory: bucketPath, + Limit: uint32(maxNamespaces * 2), + StartFromFileName: lastFileName, + InclusiveStartFrom: lastFileName != "", + }) if err != nil { - break - } - - if entry.Entry == nil || !entry.Entry.IsDirectory { - continue - } - - // Skip hidden entries - if strings.HasPrefix(entry.Entry.Name, ".") { - continue - } - - // Apply prefix filter - if req.Prefix != "" && !strings.HasPrefix(entry.Entry.Name, req.Prefix) { - continue + return err } - // Read metadata from extended attribute - data, ok := entry.Entry.Extended[ExtendedKeyMetadata] - if !ok { - continue + hasMore := false + for { + entry, respErr := resp.Recv() + if respErr != nil { + break + } + hasMore = true + lastFileName = entry.Entry.Name + + if entry.Entry == nil || !entry.Entry.IsDirectory { + continue + } + + // Skip hidden entries + if strings.HasPrefix(entry.Entry.Name, ".") { + continue + } + + // Apply prefix filter + if req.Prefix != "" && !strings.HasPrefix(entry.Entry.Name, req.Prefix) { + continue + } + + // Read metadata from extended attribute + data, ok := entry.Entry.Extended[ExtendedKeyMetadata] + if !ok { + continue + } + + var metadata namespaceMetadata + if err := json.Unmarshal(data, &metadata); err != nil { + continue + } + + namespaces = append(namespaces, NamespaceSummary{ + Namespace: metadata.Namespace, + CreatedAt: metadata.CreatedAt, + }) + + if len(namespaces) >= maxNamespaces { + return nil + } } - var metadata namespaceMetadata - if err := json.Unmarshal(data, &metadata); err != nil { - continue + if !hasMore { + break } - - namespaces = append(namespaces, NamespaceSummary{ - Namespace: metadata.Namespace, - CreatedAt: metadata.CreatedAt, - }) } return nil @@ -257,9 +275,15 @@ func (h *S3TablesHandler) handleDeleteNamespace(w http.ResponseWriter, r *http.R return err } - if req.TableBucketARN == "" || req.Namespace == "" { - h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN and namespace are required") - return fmt.Errorf("tableBucketARN and namespace are required") + if req.TableBucketARN == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required") + return fmt.Errorf("tableBucketARN is required") + } + + namespaceName, err := validateNamespace(req.Namespace) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err } bucketName, err := parseBucketNameFromARN(req.TableBucketARN) @@ -268,7 +292,7 @@ func (h *S3TablesHandler) handleDeleteNamespace(w http.ResponseWriter, r *http.R return err } - namespacePath := getNamespacePath(bucketName, req.Namespace) + namespacePath := getNamespacePath(bucketName, namespaceName) // Check if namespace exists and is empty hasChildren := false diff --git a/weed/s3api/s3tables/handler_policy.go b/weed/s3api/s3tables/handler_policy.go index a6b42ea71..97e250273 100644 --- a/weed/s3api/s3tables/handler_policy.go +++ b/weed/s3api/s3tables/handler_policy.go @@ -2,9 +2,9 @@ package s3tables import ( "encoding/json" + "errors" "fmt" "net/http" - "strings" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" ) @@ -125,7 +125,7 @@ func (h *S3TablesHandler) handleDeleteTableBucketPolicy(w http.ResponseWriter, r return h.deleteExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyPolicy) }) - if err != nil && !strings.Contains(err.Error(), "not found") { + if err != nil && !errors.Is(err, ErrNotFound) && !errors.Is(err, ErrAttributeNotFound) { h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to delete table bucket policy") return err } @@ -142,11 +142,17 @@ func (h *S3TablesHandler) handlePutTablePolicy(w http.ResponseWriter, r *http.Re return err } - if req.TableBucketARN == "" || req.Namespace == "" || req.Name == "" { + if req.TableBucketARN == "" || len(req.Namespace) == 0 || req.Name == "" { h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required") return fmt.Errorf("missing required parameters") } + namespaceName, err := validateNamespace(req.Namespace) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + if req.ResourcePolicy == "" { h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "resourcePolicy is required") return fmt.Errorf("resourcePolicy is required") @@ -159,7 +165,7 @@ func (h *S3TablesHandler) handlePutTablePolicy(w http.ResponseWriter, r *http.Re } // Check if table exists - tablePath := getTablePath(bucketName, req.Namespace, req.Name) + tablePath := getTablePath(bucketName, namespaceName, req.Name) var tableExists bool err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { _, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata) @@ -194,18 +200,24 @@ func (h *S3TablesHandler) handleGetTablePolicy(w http.ResponseWriter, r *http.Re return err } - if req.TableBucketARN == "" || req.Namespace == "" || req.Name == "" { + if req.TableBucketARN == "" || len(req.Namespace) == 0 || req.Name == "" { h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required") return fmt.Errorf("missing required parameters") } + namespaceName, err := validateNamespace(req.Namespace) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + bucketName, err := parseBucketNameFromARN(req.TableBucketARN) if err != nil { h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) return err } - tablePath := getTablePath(bucketName, req.Namespace, req.Name) + tablePath := getTablePath(bucketName, namespaceName, req.Name) var policy []byte err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { var readErr error @@ -234,23 +246,29 @@ func (h *S3TablesHandler) handleDeleteTablePolicy(w http.ResponseWriter, r *http return err } - if req.TableBucketARN == "" || req.Namespace == "" || req.Name == "" { + if req.TableBucketARN == "" || len(req.Namespace) == 0 || req.Name == "" { h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required") return fmt.Errorf("missing required parameters") } + namespaceName, err := validateNamespace(req.Namespace) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + bucketName, err := parseBucketNameFromARN(req.TableBucketARN) if err != nil { h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) return err } - tablePath := getTablePath(bucketName, req.Namespace, req.Name) + tablePath := getTablePath(bucketName, namespaceName, req.Name) err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return h.deleteExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyPolicy) }) - if err != nil && !strings.Contains(err.Error(), "not found") { + if err != nil && !errors.Is(err, ErrNotFound) && !errors.Is(err, ErrAttributeNotFound) { h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to delete table policy") return err } diff --git a/weed/s3api/s3tables/handler_table.go b/weed/s3api/s3tables/handler_table.go index da1aa6d9a..68189d844 100644 --- a/weed/s3api/s3tables/handler_table.go +++ b/weed/s3api/s3tables/handler_table.go @@ -24,9 +24,10 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque return fmt.Errorf("tableBucketARN is required") } - if req.Namespace == "" { - h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "namespace is required") - return fmt.Errorf("namespace is required") + namespaceName, err := validateNamespace(req.Namespace) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err } if req.Name == "" { @@ -58,7 +59,7 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque } // Check if namespace exists - namespacePath := getNamespacePath(bucketName, req.Namespace) + namespacePath := getNamespacePath(bucketName, namespaceName) var namespaceExists bool err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { _, err := h.getExtendedAttribute(r.Context(), client, namespacePath, ExtendedKeyMetadata) @@ -67,11 +68,11 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque }) if !namespaceExists { - h.writeError(w, http.StatusNotFound, ErrCodeNoSuchNamespace, fmt.Sprintf("namespace %s not found", req.Namespace)) + h.writeError(w, http.StatusNotFound, ErrCodeNoSuchNamespace, fmt.Sprintf("namespace %s not found", namespaceName)) return fmt.Errorf("namespace not found") } - tablePath := getTablePath(bucketName, req.Namespace, req.Name) + tablePath := getTablePath(bucketName, namespaceName, req.Name) // Check if table already exists exists := false @@ -92,7 +93,7 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque metadata := &tableMetadataInternal{ Name: req.Name, - Namespace: req.Namespace, + Namespace: namespaceName, Format: req.Format, CreatedAt: now, ModifiedAt: now, @@ -143,7 +144,7 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque return err } - tableARN := h.generateTableARN(bucketName, req.Namespace, req.Name) + tableARN := h.generateTableARN(bucketName, namespaceName, req.Name) resp := &CreateTableResponse{ TableARN: tableARN, @@ -172,13 +173,17 @@ func (h *S3TablesHandler) handleGetTable(w http.ResponseWriter, r *http.Request, h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) return err } - } else if req.TableBucketARN != "" && req.Namespace != "" && req.Name != "" { + } else if req.TableBucketARN != "" && len(req.Namespace) > 0 && req.Name != "" { bucketName, err = parseBucketNameFromARN(req.TableBucketARN) if err != nil { h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) return err } - namespace = req.Namespace + namespace, err = validateNamespace(req.Namespace) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } tableName = req.Name } else { h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "either tableARN or (tableBucketARN, namespace, name) is required") @@ -246,8 +251,15 @@ func (h *S3TablesHandler) handleListTables(w http.ResponseWriter, r *http.Reques var tables []TableSummary // If namespace is specified, list tables in that namespace only - if req.Namespace != "" { - err = h.listTablesInNamespace(r.Context(), filerClient, bucketName, req.Namespace, req.Prefix, maxTables, &tables) + if len(req.Namespace) > 0 { + namespaceName, err := validateNamespace(req.Namespace) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return h.listTablesInNamespaceWithClient(r.Context(), client, bucketName, namespaceName, req.Prefix, maxTables, &tables) + }) } else { // List tables in all namespaces err = h.listTablesInAllNamespaces(r.Context(), filerClient, bucketName, req.Prefix, maxTables, &tables) @@ -265,23 +277,29 @@ func (h *S3TablesHandler) handleListTables(w http.ResponseWriter, r *http.Reques return nil } -func (h *S3TablesHandler) listTablesInNamespace(ctx context.Context, filerClient FilerClient, bucketName, namespace, prefix string, maxTables int, tables *[]TableSummary) error { +func (h *S3TablesHandler) listTablesInNamespaceWithClient(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketName, namespace, prefix string, maxTables int, tables *[]TableSummary) error { namespacePath := getNamespacePath(bucketName, namespace) - return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + var lastFileName string + for len(*tables) < maxTables { resp, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ - Directory: namespacePath, - Limit: uint32(maxTables), + Directory: namespacePath, + Limit: uint32(maxTables * 2), + StartFromFileName: lastFileName, + InclusiveStartFrom: lastFileName != "", }) if err != nil { return err } + hasMore := false for { - entry, err := resp.Recv() - if err != nil { + entry, respErr := resp.Recv() + if respErr != nil { break } + hasMore = true + lastFileName = entry.Entry.Name if entry.Entry == nil || !entry.Entry.IsDirectory { continue @@ -317,48 +335,68 @@ func (h *S3TablesHandler) listTablesInNamespace(ctx context.Context, filerClient CreatedAt: metadata.CreatedAt, ModifiedAt: metadata.ModifiedAt, }) + + if len(*tables) >= maxTables { + return nil + } } - return nil - }) + if !hasMore { + break + } + } + + return nil } func (h *S3TablesHandler) listTablesInAllNamespaces(ctx context.Context, filerClient FilerClient, bucketName, prefix string, maxTables int, tables *[]TableSummary) error { bucketPath := getTableBucketPath(bucketName) + var lastFileName string return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - // List all namespaces first - resp, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ - Directory: bucketPath, - Limit: 1000, - }) - if err != nil { - return err - } - for { - entry, err := resp.Recv() + // List namespaces in batches + resp, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ + Directory: bucketPath, + Limit: 100, + StartFromFileName: lastFileName, + InclusiveStartFrom: lastFileName != "", + }) if err != nil { - break - } - - if entry.Entry == nil || !entry.Entry.IsDirectory { - continue - } - - // Skip hidden entries - if strings.HasPrefix(entry.Entry.Name, ".") { - continue + return err } - namespace := entry.Entry.Name - - // List tables in this namespace - if err := h.listTablesInNamespace(ctx, filerClient, bucketName, namespace, prefix, maxTables-len(*tables), tables); err != nil { - continue + hasMore := false + for { + entry, respErr := resp.Recv() + if respErr != nil { + break + } + hasMore = true + lastFileName = entry.Entry.Name + + if entry.Entry == nil || !entry.Entry.IsDirectory { + continue + } + + // Skip hidden entries + if strings.HasPrefix(entry.Entry.Name, ".") { + continue + } + + namespace := entry.Entry.Name + + // List tables in this namespace + if err := h.listTablesInNamespaceWithClient(ctx, client, bucketName, namespace, prefix, maxTables-len(*tables), tables); err != nil { + continue + } + + if len(*tables) >= maxTables { + return nil + } } - if len(*tables) >= maxTables { + if !hasMore { break } } @@ -375,18 +413,24 @@ func (h *S3TablesHandler) handleDeleteTable(w http.ResponseWriter, r *http.Reque return err } - if req.TableBucketARN == "" || req.Namespace == "" || req.Name == "" { + if req.TableBucketARN == "" || len(req.Namespace) == 0 || req.Name == "" { h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required") return fmt.Errorf("missing required parameters") } + namespaceName, err := validateNamespace(req.Namespace) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + bucketName, err := parseBucketNameFromARN(req.TableBucketARN) if err != nil { h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) return err } - tablePath := getTablePath(bucketName, req.Namespace, req.Name) + tablePath := getTablePath(bucketName, namespaceName, req.Name) // Check if table exists var tableExists bool diff --git a/weed/s3api/s3tables/types.go b/weed/s3api/s3tables/types.go index 281ee73fe..2b7abdba7 100644 --- a/weed/s3api/s3tables/types.go +++ b/weed/s3api/s3tables/types.go @@ -90,8 +90,8 @@ type CreateNamespaceResponse struct { } type GetNamespaceRequest struct { - TableBucketARN string `json:"tableBucketARN"` - Namespace string `json:"namespace"` + TableBucketARN string `json:"tableBucketARN"` + Namespace []string `json:"namespace"` } type GetNamespaceResponse struct { @@ -118,8 +118,8 @@ type ListNamespacesResponse struct { } type DeleteNamespaceRequest struct { - TableBucketARN string `json:"tableBucketARN"` - Namespace string `json:"namespace"` + TableBucketARN string `json:"tableBucketARN"` + Namespace []string `json:"namespace"` } // Table types @@ -156,7 +156,7 @@ type Table struct { type CreateTableRequest struct { TableBucketARN string `json:"tableBucketARN"` - Namespace string `json:"namespace"` + Namespace []string `json:"namespace"` Name string `json:"name"` Format string `json:"format"` Metadata *TableMetadata `json:"metadata,omitempty"` @@ -170,10 +170,10 @@ type CreateTableResponse struct { } type GetTableRequest struct { - TableBucketARN string `json:"tableBucketARN,omitempty"` - Namespace string `json:"namespace,omitempty"` - Name string `json:"name,omitempty"` - TableARN string `json:"tableARN,omitempty"` + TableBucketARN string `json:"tableBucketARN,omitempty"` + Namespace []string `json:"namespace,omitempty"` + Name string `json:"name,omitempty"` + TableARN string `json:"tableARN,omitempty"` } type GetTableResponse struct { @@ -189,11 +189,11 @@ type GetTableResponse struct { } type ListTablesRequest struct { - TableBucketARN string `json:"tableBucketARN"` - Namespace string `json:"namespace,omitempty"` - Prefix string `json:"prefix,omitempty"` - ContinuationToken string `json:"continuationToken,omitempty"` - MaxTables int `json:"maxTables,omitempty"` + TableBucketARN string `json:"tableBucketARN"` + Namespace []string `json:"namespace,omitempty"` + Prefix string `json:"prefix,omitempty"` + ContinuationToken string `json:"continuationToken,omitempty"` + MaxTables int `json:"maxTables,omitempty"` } type TableSummary struct { @@ -211,25 +211,25 @@ type ListTablesResponse struct { } type DeleteTableRequest struct { - TableBucketARN string `json:"tableBucketARN"` - Namespace string `json:"namespace"` - Name string `json:"name"` - VersionToken string `json:"versionToken,omitempty"` + TableBucketARN string `json:"tableBucketARN"` + Namespace []string `json:"namespace"` + Name string `json:"name"` + VersionToken string `json:"versionToken,omitempty"` } // Table policy types type PutTablePolicyRequest struct { - TableBucketARN string `json:"tableBucketARN"` - Namespace string `json:"namespace"` - Name string `json:"name"` - ResourcePolicy string `json:"resourcePolicy"` + TableBucketARN string `json:"tableBucketARN"` + Namespace []string `json:"namespace"` + Name string `json:"name"` + ResourcePolicy string `json:"resourcePolicy"` } type GetTablePolicyRequest struct { - TableBucketARN string `json:"tableBucketARN"` - Namespace string `json:"namespace"` - Name string `json:"name"` + TableBucketARN string `json:"tableBucketARN"` + Namespace []string `json:"namespace"` + Name string `json:"name"` } type GetTablePolicyResponse struct { @@ -237,9 +237,9 @@ type GetTablePolicyResponse struct { } type DeleteTablePolicyRequest struct { - TableBucketARN string `json:"tableBucketARN"` - Namespace string `json:"namespace"` - Name string `json:"name"` + TableBucketARN string `json:"tableBucketARN"` + Namespace []string `json:"namespace"` + Name string `json:"name"` } // Tagging types diff --git a/weed/s3api/s3tables/utils.go b/weed/s3api/s3tables/utils.go index b67af5863..3ceec3d19 100644 --- a/weed/s3api/s3tables/utils.go +++ b/weed/s3api/s3tables/utils.go @@ -5,6 +5,7 @@ import ( "path" "path/filepath" "regexp" + "strings" "time" ) @@ -98,3 +99,26 @@ func splitPath(path string) (dir, name string) { name = filepath.Base(path) return } + +// validateNamespace validates that the namespace provided is supported (single-level) +func validateNamespace(namespace []string) (string, error) { + if len(namespace) == 0 { + return "", fmt.Errorf("namespace is required") + } + if len(namespace) > 1 { + return "", fmt.Errorf("multi-level namespaces are not supported") + } + name := namespace[0] + if len(name) < 1 || len(name) > 255 { + return "", fmt.Errorf("namespace name must be between 1 and 255 characters") + } + return name, nil +} + +// flattenNamespace joins namespace elements into a single string (using dots as per AWS S3 Tables) +func flattenNamespace(namespace []string) string { + if len(namespace) == 0 { + return "" + } + return strings.Join(namespace, ".") +}