Browse Source

Refine S3 Tables implementation to address code review feedback

- Standardize namespace representation to []string
- Improve listing logic with pagination and StartFromFileName
- Enhance error handling with sentinel errors and robust checks
- Add JSON encoding error logging
- Fix CI workflow to use gofmt -l
- Standardize timestamps in directory creation
- Validate single-level namespaces
pull/8147/head
Chris Lu 3 days ago
parent
commit
33da87452b
  1. 12
      .github/workflows/s3-tables-tests.yml
  2. 12
      test/s3tables/client.go
  3. 19
      weed/s3api/s3tables/filer_ops.go
  4. 8
      weed/s3api/s3tables/handler.go
  5. 95
      weed/s3api/s3tables/handler_bucket_get_list_delete.go
  6. 132
      weed/s3api/s3tables/handler_namespace.go
  7. 36
      weed/s3api/s3tables/handler_policy.go
  8. 140
      weed/s3api/s3tables/handler_table.go
  9. 56
      weed/s3api/s3tables/types.go
  10. 24
      weed/s3api/s3tables/utils.go

12
.github/workflows/s3-tables-tests.yml

@ -133,10 +133,10 @@ jobs:
run: | run: |
set -x set -x
echo "=== Checking S3 Tables Go format ===" echo "=== Checking S3 Tables Go format ==="
unformatted=$(go fmt ./weed/s3api/s3tables/... 2>&1 | wc -l)
if [ "$unformatted" -gt 0 ]; then
unformatted=$(gofmt -l ./weed/s3api/s3tables)
if [ -n "$unformatted" ]; then
echo "Go format check failed - files need formatting" echo "Go format check failed - files need formatting"
go fmt ./weed/s3api/s3tables/...
echo "$unformatted"
exit 1 exit 1
fi fi
echo "All S3 Tables files are properly formatted" echo "All S3 Tables files are properly formatted"
@ -145,10 +145,10 @@ jobs:
run: | run: |
set -x set -x
echo "=== Checking S3 Tables test format ===" echo "=== Checking S3 Tables test format ==="
unformatted=$(go fmt ./test/s3tables/... 2>&1 | wc -l)
if [ "$unformatted" -gt 0 ]; then
unformatted=$(gofmt -l ./test/s3tables)
if [ -n "$unformatted" ]; then
echo "Go format check failed for tests" echo "Go format check failed for tests"
go fmt ./test/s3tables/...
echo "$unformatted"
exit 1 exit 1
fi fi
echo "All S3 Tables test files are properly formatted" echo "All S3 Tables test files are properly formatted"

12
test/s3tables/client.go

@ -173,7 +173,7 @@ func (c *S3TablesClient) CreateNamespace(bucketARN string, namespace []string) (
func (c *S3TablesClient) GetNamespace(bucketARN, namespace string) (*s3tables.GetNamespaceResponse, error) { func (c *S3TablesClient) GetNamespace(bucketARN, namespace string) (*s3tables.GetNamespaceResponse, error) {
req := &s3tables.GetNamespaceRequest{ req := &s3tables.GetNamespaceRequest{
TableBucketARN: bucketARN, TableBucketARN: bucketARN,
Namespace: namespace,
Namespace: []string{namespace},
} }
resp, err := c.doRequest("GetNamespace", req) resp, err := c.doRequest("GetNamespace", req)
@ -225,7 +225,7 @@ func (c *S3TablesClient) ListNamespaces(bucketARN, prefix string) (*s3tables.Lis
func (c *S3TablesClient) DeleteNamespace(bucketARN, namespace string) error { func (c *S3TablesClient) DeleteNamespace(bucketARN, namespace string) error {
req := &s3tables.DeleteNamespaceRequest{ req := &s3tables.DeleteNamespaceRequest{
TableBucketARN: bucketARN, TableBucketARN: bucketARN,
Namespace: namespace,
Namespace: []string{namespace},
} }
resp, err := c.doRequest("DeleteNamespace", req) resp, err := c.doRequest("DeleteNamespace", req)
@ -248,7 +248,7 @@ func (c *S3TablesClient) DeleteNamespace(bucketARN, namespace string) error {
func (c *S3TablesClient) CreateTable(bucketARN, namespace, name, format string, metadata *s3tables.TableMetadata, tags map[string]string) (*s3tables.CreateTableResponse, error) { func (c *S3TablesClient) CreateTable(bucketARN, namespace, name, format string, metadata *s3tables.TableMetadata, tags map[string]string) (*s3tables.CreateTableResponse, error) {
req := &s3tables.CreateTableRequest{ req := &s3tables.CreateTableRequest{
TableBucketARN: bucketARN, TableBucketARN: bucketARN,
Namespace: namespace,
Namespace: []string{namespace},
Name: name, Name: name,
Format: format, Format: format,
Metadata: metadata, Metadata: metadata,
@ -278,7 +278,7 @@ func (c *S3TablesClient) CreateTable(bucketARN, namespace, name, format string,
func (c *S3TablesClient) GetTable(bucketARN, namespace, name string) (*s3tables.GetTableResponse, error) { func (c *S3TablesClient) GetTable(bucketARN, namespace, name string) (*s3tables.GetTableResponse, error) {
req := &s3tables.GetTableRequest{ req := &s3tables.GetTableRequest{
TableBucketARN: bucketARN, TableBucketARN: bucketARN,
Namespace: namespace,
Namespace: []string{namespace},
Name: name, Name: name,
} }
@ -305,7 +305,7 @@ func (c *S3TablesClient) GetTable(bucketARN, namespace, name string) (*s3tables.
func (c *S3TablesClient) ListTables(bucketARN, namespace, prefix string) (*s3tables.ListTablesResponse, error) { func (c *S3TablesClient) ListTables(bucketARN, namespace, prefix string) (*s3tables.ListTablesResponse, error) {
req := &s3tables.ListTablesRequest{ req := &s3tables.ListTablesRequest{
TableBucketARN: bucketARN, TableBucketARN: bucketARN,
Namespace: namespace,
Namespace: []string{namespace},
Prefix: prefix, Prefix: prefix,
} }
@ -332,7 +332,7 @@ func (c *S3TablesClient) ListTables(bucketARN, namespace, prefix string) (*s3tab
func (c *S3TablesClient) DeleteTable(bucketARN, namespace, name string) error { func (c *S3TablesClient) DeleteTable(bucketARN, namespace, name string) error {
req := &s3tables.DeleteTableRequest{ req := &s3tables.DeleteTableRequest{
TableBucketARN: bucketARN, TableBucketARN: bucketARN,
Namespace: namespace,
Namespace: []string{namespace},
Name: name, Name: name,
} }

19
weed/s3api/s3tables/filer_ops.go

@ -2,25 +2,32 @@ package s3tables
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
) )
var (
ErrNotFound = errors.New("entry not found")
ErrAttributeNotFound = errors.New("attribute not found")
)
// Filer operations - Common functions for interacting with the filer // Filer operations - Common functions for interacting with the filer
// createDirectory creates a new directory at the specified path // createDirectory creates a new directory at the specified path
func (h *S3TablesHandler) createDirectory(ctx context.Context, client filer_pb.SeaweedFilerClient, path string) error { func (h *S3TablesHandler) createDirectory(ctx context.Context, client filer_pb.SeaweedFilerClient, path string) error {
dir, name := splitPath(path) dir, name := splitPath(path)
now := time.Now().Unix()
_, err := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{ _, err := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{
Directory: dir, Directory: dir,
Entry: &filer_pb.Entry{ Entry: &filer_pb.Entry{
Name: name, Name: name,
IsDirectory: true, IsDirectory: true,
Attributes: &filer_pb.FuseAttributes{ Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
Mtime: now,
Crtime: now,
FileMode: uint32(0755 | 1<<31), // Directory mode FileMode: uint32(0755 | 1<<31), // Directory mode
}, },
}, },
@ -43,7 +50,7 @@ func (h *S3TablesHandler) setExtendedAttribute(ctx context.Context, client filer
entry := resp.Entry entry := resp.Entry
if entry == nil { if entry == nil {
return fmt.Errorf("entry not found: %s", path)
return fmt.Errorf("%w: %s", ErrNotFound, path)
} }
// Update the extended attributes // Update the extended attributes
@ -72,12 +79,12 @@ func (h *S3TablesHandler) getExtendedAttribute(ctx context.Context, client filer
} }
if resp.Entry == nil { if resp.Entry == nil {
return nil, fmt.Errorf("entry not found: %s", path)
return nil, fmt.Errorf("%w: %s", ErrNotFound, path)
} }
data, ok := resp.Entry.Extended[key] data, ok := resp.Entry.Extended[key]
if !ok { if !ok {
return nil, fmt.Errorf("attribute not found: %s", key)
return nil, fmt.Errorf("%w: %s", ErrAttributeNotFound, key)
} }
return data, nil return data, nil
@ -98,7 +105,7 @@ func (h *S3TablesHandler) deleteExtendedAttribute(ctx context.Context, client fi
entry := resp.Entry entry := resp.Entry
if entry == nil { if entry == nil {
return fmt.Errorf("entry not found: %s", path)
return fmt.Errorf("%w: %s", ErrNotFound, path)
} }
// Remove the extended attribute // Remove the extended attribute

8
weed/s3api/s3tables/handler.go

@ -178,7 +178,9 @@ func (h *S3TablesHandler) writeJSON(w http.ResponseWriter, status int, data inte
w.Header().Set("Content-Type", "application/x-amz-json-1.1") w.Header().Set("Content-Type", "application/x-amz-json-1.1")
w.WriteHeader(status) w.WriteHeader(status)
if data != nil { if data != nil {
json.NewEncoder(w).Encode(data)
if err := json.NewEncoder(w).Encode(data); err != nil {
glog.Errorf("S3Tables: failed to encode response: %v", err)
}
} }
} }
@ -189,7 +191,9 @@ func (h *S3TablesHandler) writeError(w http.ResponseWriter, status int, code, me
"__type": code, "__type": code,
"message": message, "message": message,
} }
json.NewEncoder(w).Encode(errorResponse)
if err := json.NewEncoder(w).Encode(errorResponse); err != nil {
glog.Errorf("S3Tables: failed to encode error response: %v", err)
}
} }
// ARN generation helpers // ARN generation helpers

95
weed/s3api/s3tables/handler_bucket_get_list_delete.go

@ -2,6 +2,7 @@ package s3tables
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"net/http" "net/http"
"strings" "strings"
@ -77,51 +78,67 @@ func (h *S3TablesHandler) handleListTableBuckets(w http.ResponseWriter, r *http.
var buckets []TableBucketSummary var buckets []TableBucketSummary
var lastFileName string
err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.ListEntries(r.Context(), &filer_pb.ListEntriesRequest{
Directory: TablesPath,
Limit: uint32(maxBuckets),
})
if err != nil {
return err
}
for {
entry, err := resp.Recv()
for len(buckets) < maxBuckets {
resp, err := client.ListEntries(r.Context(), &filer_pb.ListEntriesRequest{
Directory: TablesPath,
Limit: uint32(maxBuckets * 2), // Fetch more than needed to account for filtering
StartFromFileName: lastFileName,
InclusiveStartFrom: lastFileName != "",
})
if err != nil { if err != nil {
break
return err
} }
if entry.Entry == nil || !entry.Entry.IsDirectory {
continue
hasMore := false
for {
entry, respErr := resp.Recv()
if respErr != nil {
break
}
hasMore = true
lastFileName = entry.Entry.Name
if !entry.Entry.IsDirectory {
continue
}
// Skip entries starting with "."
if strings.HasPrefix(entry.Entry.Name, ".") {
continue
}
// Apply prefix filter
if req.Prefix != "" && !strings.HasPrefix(entry.Entry.Name, req.Prefix) {
continue
}
// Read metadata from extended attribute
data, ok := entry.Entry.Extended[ExtendedKeyMetadata]
if !ok {
continue
}
var metadata tableBucketMetadata
if err := json.Unmarshal(data, &metadata); err != nil {
continue
}
buckets = append(buckets, TableBucketSummary{
ARN: h.generateTableBucketARN(entry.Entry.Name),
Name: entry.Entry.Name,
CreatedAt: metadata.CreatedAt,
})
if len(buckets) >= maxBuckets {
return nil
}
} }
// Skip entries starting with "."
if strings.HasPrefix(entry.Entry.Name, ".") {
continue
}
// Apply prefix filter
if req.Prefix != "" && !strings.HasPrefix(entry.Entry.Name, req.Prefix) {
continue
}
// Read metadata from extended attribute
data, ok := entry.Entry.Extended[ExtendedKeyMetadata]
if !ok {
continue
}
var metadata tableBucketMetadata
if err := json.Unmarshal(data, &metadata); err != nil {
continue
if !hasMore {
break
} }
buckets = append(buckets, TableBucketSummary{
ARN: h.generateTableBucketARN(entry.Entry.Name),
Name: entry.Entry.Name,
CreatedAt: metadata.CreatedAt,
})
} }
return nil return nil
@ -129,7 +146,7 @@ func (h *S3TablesHandler) handleListTableBuckets(w http.ResponseWriter, r *http.
if err != nil { if err != nil {
// Check if it's a "not found" error - return empty list in that case // Check if it's a "not found" error - return empty list in that case
if strings.Contains(err.Error(), "no entry is found") || strings.Contains(err.Error(), "not found") {
if errors.Is(err, ErrNotFound) {
buckets = []TableBucketSummary{} buckets = []TableBucketSummary{}
} else { } else {
// For other errors, return error response // For other errors, return error response

132
weed/s3api/s3tables/handler_namespace.go

@ -34,13 +34,10 @@ func (h *S3TablesHandler) handleCreateNamespace(w http.ResponseWriter, r *http.R
return err return err
} }
// For simplicity, use the first namespace element as the directory name
namespaceName := req.Namespace[0]
// Validate namespace name
if len(namespaceName) < 1 || len(namespaceName) > 255 {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "namespace name must be between 1 and 255 characters")
return fmt.Errorf("invalid namespace name length")
namespaceName, err := validateNamespace(req.Namespace)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
} }
// Check if table bucket exists // Check if table bucket exists
@ -122,9 +119,15 @@ func (h *S3TablesHandler) handleGetNamespace(w http.ResponseWriter, r *http.Requ
return err return err
} }
if req.TableBucketARN == "" || req.Namespace == "" {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN and namespace are required")
return fmt.Errorf("tableBucketARN and namespace are required")
if req.TableBucketARN == "" {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required")
return fmt.Errorf("tableBucketARN is required")
}
namespaceName, err := validateNamespace(req.Namespace)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
} }
bucketName, err := parseBucketNameFromARN(req.TableBucketARN) bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
@ -133,7 +136,7 @@ func (h *S3TablesHandler) handleGetNamespace(w http.ResponseWriter, r *http.Requ
return err return err
} }
namespacePath := getNamespacePath(bucketName, req.Namespace)
namespacePath := getNamespacePath(bucketName, namespaceName)
// Get namespace // Get namespace
var metadata namespaceMetadata var metadata namespaceMetadata
@ -146,7 +149,7 @@ func (h *S3TablesHandler) handleGetNamespace(w http.ResponseWriter, r *http.Requ
}) })
if err != nil { if err != nil {
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchNamespace, fmt.Sprintf("namespace %s not found", req.Namespace))
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchNamespace, fmt.Sprintf("namespace %s not found", flattenNamespace(req.Namespace)))
return err return err
} }
@ -187,51 +190,66 @@ func (h *S3TablesHandler) handleListNamespaces(w http.ResponseWriter, r *http.Re
bucketPath := getTableBucketPath(bucketName) bucketPath := getTableBucketPath(bucketName)
var namespaces []NamespaceSummary var namespaces []NamespaceSummary
// List namespaces
var lastFileName string
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.ListEntries(r.Context(), &filer_pb.ListEntriesRequest{
Directory: bucketPath,
Limit: uint32(maxNamespaces),
})
if err != nil {
return err
}
for {
entry, err := resp.Recv()
for len(namespaces) < maxNamespaces {
resp, err := client.ListEntries(r.Context(), &filer_pb.ListEntriesRequest{
Directory: bucketPath,
Limit: uint32(maxNamespaces * 2),
StartFromFileName: lastFileName,
InclusiveStartFrom: lastFileName != "",
})
if err != nil { if err != nil {
break
}
if entry.Entry == nil || !entry.Entry.IsDirectory {
continue
}
// Skip hidden entries
if strings.HasPrefix(entry.Entry.Name, ".") {
continue
}
// Apply prefix filter
if req.Prefix != "" && !strings.HasPrefix(entry.Entry.Name, req.Prefix) {
continue
return err
} }
// Read metadata from extended attribute
data, ok := entry.Entry.Extended[ExtendedKeyMetadata]
if !ok {
continue
hasMore := false
for {
entry, respErr := resp.Recv()
if respErr != nil {
break
}
hasMore = true
lastFileName = entry.Entry.Name
if entry.Entry == nil || !entry.Entry.IsDirectory {
continue
}
// Skip hidden entries
if strings.HasPrefix(entry.Entry.Name, ".") {
continue
}
// Apply prefix filter
if req.Prefix != "" && !strings.HasPrefix(entry.Entry.Name, req.Prefix) {
continue
}
// Read metadata from extended attribute
data, ok := entry.Entry.Extended[ExtendedKeyMetadata]
if !ok {
continue
}
var metadata namespaceMetadata
if err := json.Unmarshal(data, &metadata); err != nil {
continue
}
namespaces = append(namespaces, NamespaceSummary{
Namespace: metadata.Namespace,
CreatedAt: metadata.CreatedAt,
})
if len(namespaces) >= maxNamespaces {
return nil
}
} }
var metadata namespaceMetadata
if err := json.Unmarshal(data, &metadata); err != nil {
continue
if !hasMore {
break
} }
namespaces = append(namespaces, NamespaceSummary{
Namespace: metadata.Namespace,
CreatedAt: metadata.CreatedAt,
})
} }
return nil return nil
@ -257,9 +275,15 @@ func (h *S3TablesHandler) handleDeleteNamespace(w http.ResponseWriter, r *http.R
return err return err
} }
if req.TableBucketARN == "" || req.Namespace == "" {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN and namespace are required")
return fmt.Errorf("tableBucketARN and namespace are required")
if req.TableBucketARN == "" {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required")
return fmt.Errorf("tableBucketARN is required")
}
namespaceName, err := validateNamespace(req.Namespace)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
} }
bucketName, err := parseBucketNameFromARN(req.TableBucketARN) bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
@ -268,7 +292,7 @@ func (h *S3TablesHandler) handleDeleteNamespace(w http.ResponseWriter, r *http.R
return err return err
} }
namespacePath := getNamespacePath(bucketName, req.Namespace)
namespacePath := getNamespacePath(bucketName, namespaceName)
// Check if namespace exists and is empty // Check if namespace exists and is empty
hasChildren := false hasChildren := false

36
weed/s3api/s3tables/handler_policy.go

@ -2,9 +2,9 @@ package s3tables
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"net/http" "net/http"
"strings"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
) )
@ -125,7 +125,7 @@ func (h *S3TablesHandler) handleDeleteTableBucketPolicy(w http.ResponseWriter, r
return h.deleteExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyPolicy) return h.deleteExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyPolicy)
}) })
if err != nil && !strings.Contains(err.Error(), "not found") {
if err != nil && !errors.Is(err, ErrNotFound) && !errors.Is(err, ErrAttributeNotFound) {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to delete table bucket policy") h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to delete table bucket policy")
return err return err
} }
@ -142,11 +142,17 @@ func (h *S3TablesHandler) handlePutTablePolicy(w http.ResponseWriter, r *http.Re
return err return err
} }
if req.TableBucketARN == "" || req.Namespace == "" || req.Name == "" {
if req.TableBucketARN == "" || len(req.Namespace) == 0 || req.Name == "" {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required") h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required")
return fmt.Errorf("missing required parameters") return fmt.Errorf("missing required parameters")
} }
namespaceName, err := validateNamespace(req.Namespace)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
if req.ResourcePolicy == "" { if req.ResourcePolicy == "" {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "resourcePolicy is required") h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "resourcePolicy is required")
return fmt.Errorf("resourcePolicy is required") return fmt.Errorf("resourcePolicy is required")
@ -159,7 +165,7 @@ func (h *S3TablesHandler) handlePutTablePolicy(w http.ResponseWriter, r *http.Re
} }
// Check if table exists // Check if table exists
tablePath := getTablePath(bucketName, req.Namespace, req.Name)
tablePath := getTablePath(bucketName, namespaceName, req.Name)
var tableExists bool var tableExists bool
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
_, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata) _, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata)
@ -194,18 +200,24 @@ func (h *S3TablesHandler) handleGetTablePolicy(w http.ResponseWriter, r *http.Re
return err return err
} }
if req.TableBucketARN == "" || req.Namespace == "" || req.Name == "" {
if req.TableBucketARN == "" || len(req.Namespace) == 0 || req.Name == "" {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required") h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required")
return fmt.Errorf("missing required parameters") return fmt.Errorf("missing required parameters")
} }
namespaceName, err := validateNamespace(req.Namespace)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
bucketName, err := parseBucketNameFromARN(req.TableBucketARN) bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
if err != nil { if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err return err
} }
tablePath := getTablePath(bucketName, req.Namespace, req.Name)
tablePath := getTablePath(bucketName, namespaceName, req.Name)
var policy []byte var policy []byte
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
var readErr error var readErr error
@ -234,23 +246,29 @@ func (h *S3TablesHandler) handleDeleteTablePolicy(w http.ResponseWriter, r *http
return err return err
} }
if req.TableBucketARN == "" || req.Namespace == "" || req.Name == "" {
if req.TableBucketARN == "" || len(req.Namespace) == 0 || req.Name == "" {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required") h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required")
return fmt.Errorf("missing required parameters") return fmt.Errorf("missing required parameters")
} }
namespaceName, err := validateNamespace(req.Namespace)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
bucketName, err := parseBucketNameFromARN(req.TableBucketARN) bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
if err != nil { if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err return err
} }
tablePath := getTablePath(bucketName, req.Namespace, req.Name)
tablePath := getTablePath(bucketName, namespaceName, req.Name)
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return h.deleteExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyPolicy) return h.deleteExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyPolicy)
}) })
if err != nil && !strings.Contains(err.Error(), "not found") {
if err != nil && !errors.Is(err, ErrNotFound) && !errors.Is(err, ErrAttributeNotFound) {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to delete table policy") h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to delete table policy")
return err return err
} }

140
weed/s3api/s3tables/handler_table.go

@ -24,9 +24,10 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque
return fmt.Errorf("tableBucketARN is required") return fmt.Errorf("tableBucketARN is required")
} }
if req.Namespace == "" {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "namespace is required")
return fmt.Errorf("namespace is required")
namespaceName, err := validateNamespace(req.Namespace)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
} }
if req.Name == "" { if req.Name == "" {
@ -58,7 +59,7 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque
} }
// Check if namespace exists // Check if namespace exists
namespacePath := getNamespacePath(bucketName, req.Namespace)
namespacePath := getNamespacePath(bucketName, namespaceName)
var namespaceExists bool var namespaceExists bool
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
_, err := h.getExtendedAttribute(r.Context(), client, namespacePath, ExtendedKeyMetadata) _, err := h.getExtendedAttribute(r.Context(), client, namespacePath, ExtendedKeyMetadata)
@ -67,11 +68,11 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque
}) })
if !namespaceExists { if !namespaceExists {
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchNamespace, fmt.Sprintf("namespace %s not found", req.Namespace))
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchNamespace, fmt.Sprintf("namespace %s not found", namespaceName))
return fmt.Errorf("namespace not found") return fmt.Errorf("namespace not found")
} }
tablePath := getTablePath(bucketName, req.Namespace, req.Name)
tablePath := getTablePath(bucketName, namespaceName, req.Name)
// Check if table already exists // Check if table already exists
exists := false exists := false
@ -92,7 +93,7 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque
metadata := &tableMetadataInternal{ metadata := &tableMetadataInternal{
Name: req.Name, Name: req.Name,
Namespace: req.Namespace,
Namespace: namespaceName,
Format: req.Format, Format: req.Format,
CreatedAt: now, CreatedAt: now,
ModifiedAt: now, ModifiedAt: now,
@ -143,7 +144,7 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque
return err return err
} }
tableARN := h.generateTableARN(bucketName, req.Namespace, req.Name)
tableARN := h.generateTableARN(bucketName, namespaceName, req.Name)
resp := &CreateTableResponse{ resp := &CreateTableResponse{
TableARN: tableARN, TableARN: tableARN,
@ -172,13 +173,17 @@ func (h *S3TablesHandler) handleGetTable(w http.ResponseWriter, r *http.Request,
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err return err
} }
} else if req.TableBucketARN != "" && req.Namespace != "" && req.Name != "" {
} else if req.TableBucketARN != "" && len(req.Namespace) > 0 && req.Name != "" {
bucketName, err = parseBucketNameFromARN(req.TableBucketARN) bucketName, err = parseBucketNameFromARN(req.TableBucketARN)
if err != nil { if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err return err
} }
namespace = req.Namespace
namespace, err = validateNamespace(req.Namespace)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
tableName = req.Name tableName = req.Name
} else { } else {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "either tableARN or (tableBucketARN, namespace, name) is required") h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "either tableARN or (tableBucketARN, namespace, name) is required")
@ -246,8 +251,15 @@ func (h *S3TablesHandler) handleListTables(w http.ResponseWriter, r *http.Reques
var tables []TableSummary var tables []TableSummary
// If namespace is specified, list tables in that namespace only // If namespace is specified, list tables in that namespace only
if req.Namespace != "" {
err = h.listTablesInNamespace(r.Context(), filerClient, bucketName, req.Namespace, req.Prefix, maxTables, &tables)
if len(req.Namespace) > 0 {
namespaceName, err := validateNamespace(req.Namespace)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return h.listTablesInNamespaceWithClient(r.Context(), client, bucketName, namespaceName, req.Prefix, maxTables, &tables)
})
} else { } else {
// List tables in all namespaces // List tables in all namespaces
err = h.listTablesInAllNamespaces(r.Context(), filerClient, bucketName, req.Prefix, maxTables, &tables) err = h.listTablesInAllNamespaces(r.Context(), filerClient, bucketName, req.Prefix, maxTables, &tables)
@ -265,23 +277,29 @@ func (h *S3TablesHandler) handleListTables(w http.ResponseWriter, r *http.Reques
return nil return nil
} }
func (h *S3TablesHandler) listTablesInNamespace(ctx context.Context, filerClient FilerClient, bucketName, namespace, prefix string, maxTables int, tables *[]TableSummary) error {
func (h *S3TablesHandler) listTablesInNamespaceWithClient(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketName, namespace, prefix string, maxTables int, tables *[]TableSummary) error {
namespacePath := getNamespacePath(bucketName, namespace) namespacePath := getNamespacePath(bucketName, namespace)
return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
var lastFileName string
for len(*tables) < maxTables {
resp, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ resp, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
Directory: namespacePath,
Limit: uint32(maxTables),
Directory: namespacePath,
Limit: uint32(maxTables * 2),
StartFromFileName: lastFileName,
InclusiveStartFrom: lastFileName != "",
}) })
if err != nil { if err != nil {
return err return err
} }
hasMore := false
for { for {
entry, err := resp.Recv()
if err != nil {
entry, respErr := resp.Recv()
if respErr != nil {
break break
} }
hasMore = true
lastFileName = entry.Entry.Name
if entry.Entry == nil || !entry.Entry.IsDirectory { if entry.Entry == nil || !entry.Entry.IsDirectory {
continue continue
@ -317,48 +335,68 @@ func (h *S3TablesHandler) listTablesInNamespace(ctx context.Context, filerClient
CreatedAt: metadata.CreatedAt, CreatedAt: metadata.CreatedAt,
ModifiedAt: metadata.ModifiedAt, ModifiedAt: metadata.ModifiedAt,
}) })
if len(*tables) >= maxTables {
return nil
}
} }
return nil
})
if !hasMore {
break
}
}
return nil
} }
func (h *S3TablesHandler) listTablesInAllNamespaces(ctx context.Context, filerClient FilerClient, bucketName, prefix string, maxTables int, tables *[]TableSummary) error { func (h *S3TablesHandler) listTablesInAllNamespaces(ctx context.Context, filerClient FilerClient, bucketName, prefix string, maxTables int, tables *[]TableSummary) error {
bucketPath := getTableBucketPath(bucketName) bucketPath := getTableBucketPath(bucketName)
var lastFileName string
return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// List all namespaces first
resp, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
Directory: bucketPath,
Limit: 1000,
})
if err != nil {
return err
}
for { for {
entry, err := resp.Recv()
// List namespaces in batches
resp, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
Directory: bucketPath,
Limit: 100,
StartFromFileName: lastFileName,
InclusiveStartFrom: lastFileName != "",
})
if err != nil { if err != nil {
break
}
if entry.Entry == nil || !entry.Entry.IsDirectory {
continue
}
// Skip hidden entries
if strings.HasPrefix(entry.Entry.Name, ".") {
continue
return err
} }
namespace := entry.Entry.Name
// List tables in this namespace
if err := h.listTablesInNamespace(ctx, filerClient, bucketName, namespace, prefix, maxTables-len(*tables), tables); err != nil {
continue
hasMore := false
for {
entry, respErr := resp.Recv()
if respErr != nil {
break
}
hasMore = true
lastFileName = entry.Entry.Name
if entry.Entry == nil || !entry.Entry.IsDirectory {
continue
}
// Skip hidden entries
if strings.HasPrefix(entry.Entry.Name, ".") {
continue
}
namespace := entry.Entry.Name
// List tables in this namespace
if err := h.listTablesInNamespaceWithClient(ctx, client, bucketName, namespace, prefix, maxTables-len(*tables), tables); err != nil {
continue
}
if len(*tables) >= maxTables {
return nil
}
} }
if len(*tables) >= maxTables {
if !hasMore {
break break
} }
} }
@ -375,18 +413,24 @@ func (h *S3TablesHandler) handleDeleteTable(w http.ResponseWriter, r *http.Reque
return err return err
} }
if req.TableBucketARN == "" || req.Namespace == "" || req.Name == "" {
if req.TableBucketARN == "" || len(req.Namespace) == 0 || req.Name == "" {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required") h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required")
return fmt.Errorf("missing required parameters") return fmt.Errorf("missing required parameters")
} }
namespaceName, err := validateNamespace(req.Namespace)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
bucketName, err := parseBucketNameFromARN(req.TableBucketARN) bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
if err != nil { if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err return err
} }
tablePath := getTablePath(bucketName, req.Namespace, req.Name)
tablePath := getTablePath(bucketName, namespaceName, req.Name)
// Check if table exists // Check if table exists
var tableExists bool var tableExists bool

56
weed/s3api/s3tables/types.go

@ -90,8 +90,8 @@ type CreateNamespaceResponse struct {
} }
type GetNamespaceRequest struct { type GetNamespaceRequest struct {
TableBucketARN string `json:"tableBucketARN"`
Namespace string `json:"namespace"`
TableBucketARN string `json:"tableBucketARN"`
Namespace []string `json:"namespace"`
} }
type GetNamespaceResponse struct { type GetNamespaceResponse struct {
@ -118,8 +118,8 @@ type ListNamespacesResponse struct {
} }
type DeleteNamespaceRequest struct { type DeleteNamespaceRequest struct {
TableBucketARN string `json:"tableBucketARN"`
Namespace string `json:"namespace"`
TableBucketARN string `json:"tableBucketARN"`
Namespace []string `json:"namespace"`
} }
// Table types // Table types
@ -156,7 +156,7 @@ type Table struct {
type CreateTableRequest struct { type CreateTableRequest struct {
TableBucketARN string `json:"tableBucketARN"` TableBucketARN string `json:"tableBucketARN"`
Namespace string `json:"namespace"`
Namespace []string `json:"namespace"`
Name string `json:"name"` Name string `json:"name"`
Format string `json:"format"` Format string `json:"format"`
Metadata *TableMetadata `json:"metadata,omitempty"` Metadata *TableMetadata `json:"metadata,omitempty"`
@ -170,10 +170,10 @@ type CreateTableResponse struct {
} }
type GetTableRequest struct { type GetTableRequest struct {
TableBucketARN string `json:"tableBucketARN,omitempty"`
Namespace string `json:"namespace,omitempty"`
Name string `json:"name,omitempty"`
TableARN string `json:"tableARN,omitempty"`
TableBucketARN string `json:"tableBucketARN,omitempty"`
Namespace []string `json:"namespace,omitempty"`
Name string `json:"name,omitempty"`
TableARN string `json:"tableARN,omitempty"`
} }
type GetTableResponse struct { type GetTableResponse struct {
@ -189,11 +189,11 @@ type GetTableResponse struct {
} }
type ListTablesRequest struct { type ListTablesRequest struct {
TableBucketARN string `json:"tableBucketARN"`
Namespace string `json:"namespace,omitempty"`
Prefix string `json:"prefix,omitempty"`
ContinuationToken string `json:"continuationToken,omitempty"`
MaxTables int `json:"maxTables,omitempty"`
TableBucketARN string `json:"tableBucketARN"`
Namespace []string `json:"namespace,omitempty"`
Prefix string `json:"prefix,omitempty"`
ContinuationToken string `json:"continuationToken,omitempty"`
MaxTables int `json:"maxTables,omitempty"`
} }
type TableSummary struct { type TableSummary struct {
@ -211,25 +211,25 @@ type ListTablesResponse struct {
} }
type DeleteTableRequest struct { type DeleteTableRequest struct {
TableBucketARN string `json:"tableBucketARN"`
Namespace string `json:"namespace"`
Name string `json:"name"`
VersionToken string `json:"versionToken,omitempty"`
TableBucketARN string `json:"tableBucketARN"`
Namespace []string `json:"namespace"`
Name string `json:"name"`
VersionToken string `json:"versionToken,omitempty"`
} }
// Table policy types // Table policy types
type PutTablePolicyRequest struct { type PutTablePolicyRequest struct {
TableBucketARN string `json:"tableBucketARN"`
Namespace string `json:"namespace"`
Name string `json:"name"`
ResourcePolicy string `json:"resourcePolicy"`
TableBucketARN string `json:"tableBucketARN"`
Namespace []string `json:"namespace"`
Name string `json:"name"`
ResourcePolicy string `json:"resourcePolicy"`
} }
type GetTablePolicyRequest struct { type GetTablePolicyRequest struct {
TableBucketARN string `json:"tableBucketARN"`
Namespace string `json:"namespace"`
Name string `json:"name"`
TableBucketARN string `json:"tableBucketARN"`
Namespace []string `json:"namespace"`
Name string `json:"name"`
} }
type GetTablePolicyResponse struct { type GetTablePolicyResponse struct {
@ -237,9 +237,9 @@ type GetTablePolicyResponse struct {
} }
type DeleteTablePolicyRequest struct { type DeleteTablePolicyRequest struct {
TableBucketARN string `json:"tableBucketARN"`
Namespace string `json:"namespace"`
Name string `json:"name"`
TableBucketARN string `json:"tableBucketARN"`
Namespace []string `json:"namespace"`
Name string `json:"name"`
} }
// Tagging types // Tagging types

24
weed/s3api/s3tables/utils.go

@ -5,6 +5,7 @@ import (
"path" "path"
"path/filepath" "path/filepath"
"regexp" "regexp"
"strings"
"time" "time"
) )
@ -98,3 +99,26 @@ func splitPath(path string) (dir, name string) {
name = filepath.Base(path) name = filepath.Base(path)
return return
} }
// validateNamespace validates that the namespace provided is supported (single-level)
func validateNamespace(namespace []string) (string, error) {
if len(namespace) == 0 {
return "", fmt.Errorf("namespace is required")
}
if len(namespace) > 1 {
return "", fmt.Errorf("multi-level namespaces are not supported")
}
name := namespace[0]
if len(name) < 1 || len(name) > 255 {
return "", fmt.Errorf("namespace name must be between 1 and 255 characters")
}
return name, nil
}
// flattenNamespace joins namespace elements into a single string (using dots as per AWS S3 Tables)
func flattenNamespace(namespace []string) string {
if len(namespace) == 0 {
return ""
}
return strings.Join(namespace, ".")
}
Loading…
Cancel
Save