From d730f8132106ebceecc0a80fef381ac0b7ea87ba Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 28 Jan 2026 00:55:11 -0800 Subject: [PATCH] s3tables: extract utility and filer operations to separate modules - Move ARN parsing, path helpers, and metadata structures to utils.go - Extract all extended attribute and filer operations to filer_ops.go - Reduces code duplication and improves modularity - Improves code organization and maintainability --- weed/s3api/s3tables/filer_ops.go | 138 +++++++++++++++++++++++++++++++ weed/s3api/s3tables/utils.go | 109 ++++++++++++++++++++++++ 2 files changed, 247 insertions(+) create mode 100644 weed/s3api/s3tables/filer_ops.go create mode 100644 weed/s3api/s3tables/utils.go diff --git a/weed/s3api/s3tables/filer_ops.go b/weed/s3api/s3tables/filer_ops.go new file mode 100644 index 000000000..9e9c21bb8 --- /dev/null +++ b/weed/s3api/s3tables/filer_ops.go @@ -0,0 +1,138 @@ +package s3tables + +import ( + "context" + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +// Filer operations - Common functions for interacting with the filer + +// createDirectory creates a new directory at the specified path +func (h *S3TablesHandler) createDirectory(client filer_pb.SeaweedFilerClient, path string) error { + dir, name := splitPath(path) + _, err := client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{ + Directory: dir, + Entry: &filer_pb.Entry{ + Name: name, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(0755 | 1<<31), // Directory mode + }, + }, + }) + return err +} + +// setExtendedAttribute sets an extended attribute on an existing entry +func (h *S3TablesHandler) setExtendedAttribute(client filer_pb.SeaweedFilerClient, path, key string, data []byte) error { + dir, name := splitPath(path) + + // First, get the existing entry + resp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, + Name: name, + }) + if err != nil { + return err + } + + entry := resp.Entry + if entry == nil { + return fmt.Errorf("entry not found: %s", path) + } + + // Update the extended attributes + if entry.Extended == nil { + entry.Extended = make(map[string][]byte) + } + entry.Extended[key] = data + + // Save the updated entry + _, err = client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ + Directory: dir, + Entry: entry, + }) + return err +} + +// getExtendedAttribute gets an extended attribute from an entry +func (h *S3TablesHandler) getExtendedAttribute(client filer_pb.SeaweedFilerClient, path, key string) ([]byte, error) { + dir, name := splitPath(path) + resp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, + Name: name, + }) + if err != nil { + return nil, err + } + + if resp.Entry == nil { + return nil, fmt.Errorf("entry not found: %s", path) + } + + data, ok := resp.Entry.Extended[key] + if !ok { + return nil, fmt.Errorf("attribute not found: %s", key) + } + + return data, nil +} + +// deleteExtendedAttribute deletes an extended attribute from an entry +func (h *S3TablesHandler) deleteExtendedAttribute(client filer_pb.SeaweedFilerClient, path, key string) error { + dir, name := splitPath(path) + + // Get the existing entry + resp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, + Name: name, + }) + if err != nil { + return err + } + + entry := resp.Entry + if entry == nil { + return fmt.Errorf("entry not found: %s", path) + } + + // Remove the extended attribute + if entry.Extended != nil { + delete(entry.Extended, key) + } + + // Save the updated entry + _, err = client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ + Directory: dir, + Entry: entry, + }) + return err +} + +// deleteDirectory deletes a directory and all its contents +func (h *S3TablesHandler) deleteDirectory(client filer_pb.SeaweedFilerClient, path string) error { + dir, name := splitPath(path) + _, err := client.DeleteEntry(context.Background(), &filer_pb.DeleteEntryRequest{ + Directory: dir, + Name: name, + IsDeleteData: true, + IsRecursive: true, + IgnoreRecursiveError: true, + }) + return err +} + +// entryExists checks if an entry exists at the given path +func (h *S3TablesHandler) entryExists(client filer_pb.SeaweedFilerClient, path string) bool { + dir, name := splitPath(path) + resp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, + Name: name, + }) + return err == nil && resp.Entry != nil +} diff --git a/weed/s3api/s3tables/utils.go b/weed/s3api/s3tables/utils.go new file mode 100644 index 000000000..245533be6 --- /dev/null +++ b/weed/s3api/s3tables/utils.go @@ -0,0 +1,109 @@ +package s3tables + +import ( + "fmt" + "regexp" + "time" +) + +// ARN parsing functions + +// parseBucketNameFromARN extracts bucket name from table bucket ARN +// ARN format: arn:aws:s3tables:{region}:{account}:bucket/{bucket-name} +func parseBucketNameFromARN(arn string) (string, error) { + pattern := regexp.MustCompile(`^arn:aws:s3tables:[^:]*:[^:]*:bucket/([a-z0-9_-]+)$`) + matches := pattern.FindStringSubmatch(arn) + if len(matches) != 2 { + return "", fmt.Errorf("invalid bucket ARN: %s", arn) + } + return matches[1], nil +} + +// parseTableFromARN extracts bucket name, namespace, and table name from ARN +// ARN format: arn:aws:s3tables:{region}:{account}:bucket/{bucket-name}/table/{namespace}/{table-name} +func parseTableFromARN(arn string) (bucketName, namespace, tableName string, err error) { + pattern := regexp.MustCompile(`^arn:aws:s3tables:[^:]*:[^:]*:bucket/([a-z0-9_-]+)/table/([a-z0-9_]+)/([a-z0-9_]+)$`) + matches := pattern.FindStringSubmatch(arn) + if len(matches) != 4 { + return "", "", "", fmt.Errorf("invalid table ARN: %s", arn) + } + return matches[1], matches[2], matches[3], nil +} + +// Path helpers + +// getTableBucketPath returns the filer path for a table bucket +func getTableBucketPath(bucketName string) string { + return fmt.Sprintf("%s/%s", TablesPath, bucketName) +} + +// getNamespacePath returns the filer path for a namespace +func getNamespacePath(bucketName, namespace string) string { + return fmt.Sprintf("%s/%s/namespaces/%s", TablesPath, bucketName, namespace) +} + +// getTablePath returns the filer path for a table +func getTablePath(bucketName, namespace, tableName string) string { + return fmt.Sprintf("%s/%s/namespaces/%s/%s", TablesPath, bucketName, namespace, tableName) +} + +// Metadata structures + +// tableBucketMetadata stores metadata for a table bucket +type tableBucketMetadata struct { + Name string `json:"name"` + CreatedAt time.Time `json:"createdAt"` + OwnerID string `json:"ownerAccountId"` +} + +// namespaceMetadata stores metadata for a namespace +type namespaceMetadata struct { + Namespace []string `json:"namespace"` + CreatedAt time.Time `json:"createdAt"` + OwnerID string `json:"ownerAccountId"` +} + +// tableMetadataInternal stores metadata for a table +type tableMetadataInternal struct { + Name string `json:"name"` + Namespace string `json:"namespace"` + Format string `json:"format"` + CreatedAt time.Time `json:"createdAt"` + ModifiedAt time.Time `json:"modifiedAt"` + OwnerID string `json:"ownerAccountId"` + VersionToken string `json:"versionToken"` + MetadataLocation string `json:"metadataLocation,omitempty"` + Schema *TableMetadata `json:"metadata,omitempty"` +} + +// Utility functions + +// generateVersionToken generates a unique version token +func generateVersionToken() string { + return fmt.Sprintf("%d", time.Now().UnixNano()) +} + +// splitPath splits a path into directory and name components +func splitPath(path string) (dir, name string) { + var idx int + var i int + + // Remove trailing slash + for i = len(path) - 1; i >= 0 && path[i] == '/'; i-- { + } + path = path[:i+1] + + // Find last separator + idx = len(path) - 1 + for idx >= 0 && path[idx] != '/' { + idx-- + } + + if idx == -1 { + return "/", path + } + if idx == 0 { + return "/", path[1:] + } + return path[:idx], path[idx+1:] +}