diff --git a/weed/s3api/s3tables/namespace.go b/weed/s3api/s3tables/namespace.go new file mode 100644 index 000000000..027ce8a8b --- /dev/null +++ b/weed/s3api/s3tables/namespace.go @@ -0,0 +1,310 @@ +package s3tables + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +// handleCreateNamespace creates a new namespace in a table bucket +func (h *S3TablesHandler) handleCreateNamespace(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + var req CreateNamespaceRequest + if err := h.readRequestBody(r, &req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + if req.TableBucketARN == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required") + return fmt.Errorf("tableBucketARN is required") + } + + if len(req.Namespace) == 0 { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "namespace is required") + return fmt.Errorf("namespace is required") + } + + bucketName, err := parseBucketNameFromARN(req.TableBucketARN) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + // For simplicity, use the first namespace element as the directory name + namespaceName := req.Namespace[0] + + // Validate namespace name + if len(namespaceName) < 1 || len(namespaceName) > 255 { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "namespace name must be between 1 and 255 characters") + return fmt.Errorf("invalid namespace name length") + } + + // Check if table bucket exists + bucketPath := getTableBucketPath(bucketName) + var bucketExists bool + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + _, err := h.getExtendedAttribute(client, bucketPath, ExtendedKeyMetadata) + bucketExists = err == nil + return nil + }) + + if !bucketExists { + h.writeError(w, http.StatusNotFound, ErrCodeNoSuchBucket, fmt.Sprintf("table bucket %s not found", bucketName)) + return fmt.Errorf("bucket not found") + } + + namespacePath := getNamespacePath(bucketName, namespaceName) + + // Check if namespace already exists + exists := false + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + _, err := h.getExtendedAttribute(client, namespacePath, ExtendedKeyMetadata) + exists = err == nil + return nil + }) + + if exists { + h.writeError(w, http.StatusConflict, ErrCodeNamespaceAlreadyExists, fmt.Sprintf("namespace %s already exists", namespaceName)) + return fmt.Errorf("namespace already exists") + } + + // Create the namespace + now := time.Now() + metadata := &namespaceMetadata{ + Namespace: req.Namespace, + CreatedAt: now, + OwnerID: h.accountID, + } + + metadataBytes, _ := json.Marshal(metadata) + + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + // Create namespace directory + if err := h.createDirectory(client, namespacePath); err != nil { + return err + } + + // Set metadata as extended attribute + if err := h.setExtendedAttribute(client, namespacePath, ExtendedKeyMetadata, metadataBytes); err != nil { + return err + } + + return nil + }) + + if err != nil { + h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to create namespace") + return err + } + + resp := &CreateNamespaceResponse{ + Namespace: req.Namespace, + TableBucketARN: req.TableBucketARN, + } + + h.writeJSON(w, http.StatusOK, resp) + return nil +} + +// handleGetNamespace gets details of a namespace +func (h *S3TablesHandler) handleGetNamespace(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + var req GetNamespaceRequest + if err := h.readRequestBody(r, &req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + if req.TableBucketARN == "" || req.Namespace == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN and namespace are required") + return fmt.Errorf("tableBucketARN and namespace are required") + } + + bucketName, err := parseBucketNameFromARN(req.TableBucketARN) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + namespacePath := getNamespacePath(bucketName, req.Namespace) + + var metadata namespaceMetadata + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + data, err := h.getExtendedAttribute(client, namespacePath, ExtendedKeyMetadata) + if err != nil { + return err + } + return json.Unmarshal(data, &metadata) + }) + + if err != nil { + h.writeError(w, http.StatusNotFound, ErrCodeNoSuchNamespace, fmt.Sprintf("namespace %s not found", req.Namespace)) + return err + } + + resp := &GetNamespaceResponse{ + Namespace: metadata.Namespace, + CreatedAt: metadata.CreatedAt, + OwnerAccountID: metadata.OwnerID, + } + + h.writeJSON(w, http.StatusOK, resp) + return nil +} + +// handleListNamespaces lists all namespaces in a table bucket +func (h *S3TablesHandler) handleListNamespaces(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + var req ListNamespacesRequest + if err := h.readRequestBody(r, &req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + if req.TableBucketARN == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required") + return fmt.Errorf("tableBucketARN is required") + } + + bucketName, err := parseBucketNameFromARN(req.TableBucketARN) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + maxNamespaces := req.MaxNamespaces + if maxNamespaces <= 0 { + maxNamespaces = 100 + } + + bucketPath := getTableBucketPath(bucketName) + var namespaces []NamespaceSummary + + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{ + Directory: bucketPath, + Limit: uint32(maxNamespaces), + }) + if err != nil { + return err + } + + for { + entry, err := resp.Recv() + if err != nil { + break + } + + if entry.Entry == nil || !entry.Entry.IsDirectory { + continue + } + + // Skip hidden entries + if strings.HasPrefix(entry.Entry.Name, ".") { + continue + } + + // Apply prefix filter + if req.Prefix != "" && !strings.HasPrefix(entry.Entry.Name, req.Prefix) { + continue + } + + // Read metadata from extended attribute + data, ok := entry.Entry.Extended[ExtendedKeyMetadata] + if !ok { + continue + } + + var metadata namespaceMetadata + if err := json.Unmarshal(data, &metadata); err != nil { + continue + } + + namespaces = append(namespaces, NamespaceSummary{ + Namespace: metadata.Namespace, + CreatedAt: metadata.CreatedAt, + }) + } + + return nil + }) + + if err != nil { + namespaces = []NamespaceSummary{} + } + + resp := &ListNamespacesResponse{ + Namespaces: namespaces, + } + + h.writeJSON(w, http.StatusOK, resp) + return nil +} + +// handleDeleteNamespace deletes a namespace from a table bucket +func (h *S3TablesHandler) handleDeleteNamespace(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + var req DeleteNamespaceRequest + if err := h.readRequestBody(r, &req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + if req.TableBucketARN == "" || req.Namespace == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN and namespace are required") + return fmt.Errorf("tableBucketARN and namespace are required") + } + + bucketName, err := parseBucketNameFromARN(req.TableBucketARN) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + namespacePath := getNamespacePath(bucketName, req.Namespace) + + // Check if namespace exists and is empty + hasChildren := false + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{ + Directory: namespacePath, + Limit: 10, + }) + if err != nil { + return err + } + + for { + entry, err := resp.Recv() + if err != nil { + break + } + if entry.Entry != nil && !strings.HasPrefix(entry.Entry.Name, ".") { + hasChildren = true + break + } + } + + return nil + }) + + if hasChildren { + h.writeError(w, http.StatusConflict, ErrCodeNamespaceNotEmpty, "namespace is not empty") + return fmt.Errorf("namespace not empty") + } + + // Delete the namespace + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return h.deleteDirectory(client, namespacePath) + }) + + if err != nil { + h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to delete namespace") + return err + } + + h.writeJSON(w, http.StatusOK, nil) + return nil +} diff --git a/weed/s3api/s3tables/policy.go b/weed/s3api/s3tables/policy.go new file mode 100644 index 000000000..bcb0c902a --- /dev/null +++ b/weed/s3api/s3tables/policy.go @@ -0,0 +1,419 @@ +package s3tables + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +// handlePutTableBucketPolicy puts a policy on a table bucket +func (h *S3TablesHandler) handlePutTableBucketPolicy(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + var req PutTableBucketPolicyRequest + if err := h.readRequestBody(r, &req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + if req.TableBucketARN == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required") + return fmt.Errorf("tableBucketARN is required") + } + + if req.ResourcePolicy == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "resourcePolicy is required") + return fmt.Errorf("resourcePolicy is required") + } + + bucketName, err := parseBucketNameFromARN(req.TableBucketARN) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + // Check if bucket exists + bucketPath := getTableBucketPath(bucketName) + var bucketExists bool + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + _, err := h.getExtendedAttribute(client, bucketPath, ExtendedKeyMetadata) + bucketExists = err == nil + return nil + }) + + if !bucketExists { + h.writeError(w, http.StatusNotFound, ErrCodeNoSuchBucket, fmt.Sprintf("table bucket %s not found", bucketName)) + return fmt.Errorf("bucket not found") + } + + // Write policy + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return h.setExtendedAttribute(client, bucketPath, ExtendedKeyPolicy, []byte(req.ResourcePolicy)) + }) + + if err != nil { + h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to put table bucket policy") + return err + } + + h.writeJSON(w, http.StatusOK, nil) + return nil +} + +// handleGetTableBucketPolicy gets the policy of a table bucket +func (h *S3TablesHandler) handleGetTableBucketPolicy(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + var req GetTableBucketPolicyRequest + if err := h.readRequestBody(r, &req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + if req.TableBucketARN == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required") + return fmt.Errorf("tableBucketARN is required") + } + + bucketName, err := parseBucketNameFromARN(req.TableBucketARN) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + bucketPath := getTableBucketPath(bucketName) + var policy []byte + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + var readErr error + policy, readErr = h.getExtendedAttribute(client, bucketPath, ExtendedKeyPolicy) + return readErr + }) + + if err != nil { + h.writeError(w, http.StatusNotFound, ErrCodeNoSuchPolicy, "table bucket policy not found") + return err + } + + resp := &GetTableBucketPolicyResponse{ + ResourcePolicy: string(policy), + } + + h.writeJSON(w, http.StatusOK, resp) + return nil +} + +// handleDeleteTableBucketPolicy deletes the policy of a table bucket +func (h *S3TablesHandler) handleDeleteTableBucketPolicy(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + var req DeleteTableBucketPolicyRequest + if err := h.readRequestBody(r, &req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + if req.TableBucketARN == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required") + return fmt.Errorf("tableBucketARN is required") + } + + bucketName, err := parseBucketNameFromARN(req.TableBucketARN) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + bucketPath := getTableBucketPath(bucketName) + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return h.deleteExtendedAttribute(client, bucketPath, ExtendedKeyPolicy) + }) + + if err != nil { + // Ignore error if policy doesn't exist + } + + h.writeJSON(w, http.StatusOK, nil) + return nil +} + +// handlePutTablePolicy puts a policy on a table +func (h *S3TablesHandler) handlePutTablePolicy(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + var req PutTablePolicyRequest + if err := h.readRequestBody(r, &req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + if req.TableBucketARN == "" || req.Namespace == "" || req.Name == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required") + return fmt.Errorf("missing required parameters") + } + + if req.ResourcePolicy == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "resourcePolicy is required") + return fmt.Errorf("resourcePolicy is required") + } + + bucketName, err := parseBucketNameFromARN(req.TableBucketARN) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + // Check if table exists + tablePath := getTablePath(bucketName, req.Namespace, req.Name) + var tableExists bool + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + _, err := h.getExtendedAttribute(client, tablePath, ExtendedKeyMetadata) + tableExists = err == nil + return nil + }) + + if !tableExists { + h.writeError(w, http.StatusNotFound, ErrCodeNoSuchTable, fmt.Sprintf("table %s not found", req.Name)) + return fmt.Errorf("table not found") + } + + // Write policy + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return h.setExtendedAttribute(client, tablePath, ExtendedKeyPolicy, []byte(req.ResourcePolicy)) + }) + + if err != nil { + h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to put table policy") + return err + } + + h.writeJSON(w, http.StatusOK, nil) + return nil +} + +// handleGetTablePolicy gets the policy of a table +func (h *S3TablesHandler) handleGetTablePolicy(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + var req GetTablePolicyRequest + if err := h.readRequestBody(r, &req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + if req.TableBucketARN == "" || req.Namespace == "" || req.Name == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required") + return fmt.Errorf("missing required parameters") + } + + bucketName, err := parseBucketNameFromARN(req.TableBucketARN) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + tablePath := getTablePath(bucketName, req.Namespace, req.Name) + var policy []byte + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + var readErr error + policy, readErr = h.getExtendedAttribute(client, tablePath, ExtendedKeyPolicy) + return readErr + }) + + if err != nil { + h.writeError(w, http.StatusNotFound, ErrCodeNoSuchPolicy, "table policy not found") + return err + } + + resp := &GetTablePolicyResponse{ + ResourcePolicy: string(policy), + } + + h.writeJSON(w, http.StatusOK, resp) + return nil +} + +// handleDeleteTablePolicy deletes the policy of a table +func (h *S3TablesHandler) handleDeleteTablePolicy(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + var req DeleteTablePolicyRequest + if err := h.readRequestBody(r, &req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + if req.TableBucketARN == "" || req.Namespace == "" || req.Name == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required") + return fmt.Errorf("missing required parameters") + } + + bucketName, err := parseBucketNameFromARN(req.TableBucketARN) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + tablePath := getTablePath(bucketName, req.Namespace, req.Name) + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return h.deleteExtendedAttribute(client, tablePath, ExtendedKeyPolicy) + }) + + if err != nil { + // Ignore error if policy doesn't exist + } + + h.writeJSON(w, http.StatusOK, nil) + return nil +} + +// handleTagResource adds tags to a resource +func (h *S3TablesHandler) handleTagResource(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + var req TagResourceRequest + if err := h.readRequestBody(r, &req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + if req.ResourceARN == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "resourceArn is required") + return fmt.Errorf("resourceArn is required") + } + + if len(req.Tags) == 0 { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tags are required") + return fmt.Errorf("tags are required") + } + + // Parse resource ARN to determine if it's a bucket or table + resourcePath, extendedKey, err := h.resolveResourcePath(req.ResourceARN) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + // Read existing tags and merge + existingTags := make(map[string]string) + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + data, err := h.getExtendedAttribute(client, resourcePath, extendedKey) + if err == nil { + json.Unmarshal(data, &existingTags) + } + return nil + }) + + // Merge new tags + for k, v := range req.Tags { + existingTags[k] = v + } + + // Write merged tags + tagsBytes, _ := json.Marshal(existingTags) + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return h.setExtendedAttribute(client, resourcePath, extendedKey, tagsBytes) + }) + + if err != nil { + h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to tag resource") + return err + } + + h.writeJSON(w, http.StatusOK, nil) + return nil +} + +// handleListTagsForResource lists tags for a resource +func (h *S3TablesHandler) handleListTagsForResource(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + var req ListTagsForResourceRequest + if err := h.readRequestBody(r, &req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + if req.ResourceARN == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "resourceArn is required") + return fmt.Errorf("resourceArn is required") + } + + resourcePath, extendedKey, err := h.resolveResourcePath(req.ResourceARN) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + tags := make(map[string]string) + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + data, err := h.getExtendedAttribute(client, resourcePath, extendedKey) + if err != nil { + return nil // Return empty tags if not found + } + return json.Unmarshal(data, &tags) + }) + + resp := &ListTagsForResourceResponse{ + Tags: tags, + } + + h.writeJSON(w, http.StatusOK, resp) + return nil +} + +// handleUntagResource removes tags from a resource +func (h *S3TablesHandler) handleUntagResource(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + var req UntagResourceRequest + if err := h.readRequestBody(r, &req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + if req.ResourceARN == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "resourceArn is required") + return fmt.Errorf("resourceArn is required") + } + + if len(req.TagKeys) == 0 { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tagKeys are required") + return fmt.Errorf("tagKeys are required") + } + + resourcePath, extendedKey, err := h.resolveResourcePath(req.ResourceARN) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + // Read existing tags + tags := make(map[string]string) + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + data, err := h.getExtendedAttribute(client, resourcePath, extendedKey) + if err != nil { + return nil + } + return json.Unmarshal(data, &tags) + }) + + // Remove specified tags + for _, key := range req.TagKeys { + delete(tags, key) + } + + // Write updated tags + tagsBytes, _ := json.Marshal(tags) + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return h.setExtendedAttribute(client, resourcePath, extendedKey, tagsBytes) + }) + + if err != nil { + h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to untag resource") + return err + } + + h.writeJSON(w, http.StatusOK, nil) + return nil +} + +// resolveResourcePath determines the resource path and extended attribute key from a resource ARN +func (h *S3TablesHandler) resolveResourcePath(resourceARN string) (path string, key string, err error) { + // Try parsing as table ARN first + bucketName, namespace, tableName, err := parseTableFromARN(resourceARN) + if err == nil { + return getTablePath(bucketName, namespace, tableName), ExtendedKeyTags, nil + } + + // Try parsing as bucket ARN + bucketName, err = parseBucketNameFromARN(resourceARN) + if err == nil { + return getTableBucketPath(bucketName), ExtendedKeyTags, nil + } + + return "", "", fmt.Errorf("invalid resource ARN: %s", resourceARN) +} diff --git a/weed/s3api/s3tables/table.go b/weed/s3api/s3tables/table.go new file mode 100644 index 000000000..3a96f3650 --- /dev/null +++ b/weed/s3api/s3tables/table.go @@ -0,0 +1,409 @@ +package s3tables + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +// handleCreateTable creates a new table in a namespace +func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + var req CreateTableRequest + if err := h.readRequestBody(r, &req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + if req.TableBucketARN == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required") + return fmt.Errorf("tableBucketARN is required") + } + + if req.Namespace == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "namespace is required") + return fmt.Errorf("namespace is required") + } + + if req.Name == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "name is required") + return fmt.Errorf("name is required") + } + + if req.Format == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "format is required") + return fmt.Errorf("format is required") + } + + // Validate format + if req.Format != "ICEBERG" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "only ICEBERG format is supported") + return fmt.Errorf("invalid format") + } + + bucketName, err := parseBucketNameFromARN(req.TableBucketARN) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + // Validate table name + if len(req.Name) < 1 || len(req.Name) > 255 { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "table name must be between 1 and 255 characters") + return fmt.Errorf("invalid table name length") + } + + // Check if namespace exists + namespacePath := getNamespacePath(bucketName, req.Namespace) + var namespaceExists bool + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + _, err := h.getExtendedAttribute(client, namespacePath, ExtendedKeyMetadata) + namespaceExists = err == nil + return nil + }) + + if !namespaceExists { + h.writeError(w, http.StatusNotFound, ErrCodeNoSuchNamespace, fmt.Sprintf("namespace %s not found", req.Namespace)) + return fmt.Errorf("namespace not found") + } + + tablePath := getTablePath(bucketName, req.Namespace, req.Name) + + // Check if table already exists + exists := false + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + _, err := h.getExtendedAttribute(client, tablePath, ExtendedKeyMetadata) + exists = err == nil + return nil + }) + + if exists { + h.writeError(w, http.StatusConflict, ErrCodeTableAlreadyExists, fmt.Sprintf("table %s already exists", req.Name)) + return fmt.Errorf("table already exists") + } + + // Create the table + now := time.Now() + versionToken := generateVersionToken() + + metadata := &tableMetadataInternal{ + Name: req.Name, + Namespace: req.Namespace, + Format: req.Format, + CreatedAt: now, + ModifiedAt: now, + OwnerID: h.accountID, + VersionToken: versionToken, + Schema: req.Metadata, + } + + metadataBytes, _ := json.Marshal(metadata) + + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + // Create table directory + if err := h.createDirectory(client, tablePath); err != nil { + return err + } + + // Create data subdirectory for Iceberg files + dataPath := tablePath + "/data" + if err := h.createDirectory(client, dataPath); err != nil { + return err + } + + // Set metadata as extended attribute + if err := h.setExtendedAttribute(client, tablePath, ExtendedKeyMetadata, metadataBytes); err != nil { + return err + } + + // Set tags if provided + if len(req.Tags) > 0 { + tagsBytes, _ := json.Marshal(req.Tags) + if err := h.setExtendedAttribute(client, tablePath, ExtendedKeyTags, tagsBytes); err != nil { + return err + } + } + + return nil + }) + + if err != nil { + h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to create table") + return err + } + + tableARN := h.generateTableARN(bucketName, req.Namespace, req.Name) + + resp := &CreateTableResponse{ + TableARN: tableARN, + VersionToken: versionToken, + } + + h.writeJSON(w, http.StatusOK, resp) + return nil +} + +// handleGetTable gets details of a table +func (h *S3TablesHandler) handleGetTable(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + var req GetTableRequest + if err := h.readRequestBody(r, &req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + var bucketName, namespace, tableName string + var err error + + // Support getting by ARN or by bucket/namespace/name + if req.TableARN != "" { + bucketName, namespace, tableName, err = parseTableFromARN(req.TableARN) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + } else if req.TableBucketARN != "" && req.Namespace != "" && req.Name != "" { + bucketName, err = parseBucketNameFromARN(req.TableBucketARN) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + namespace = req.Namespace + tableName = req.Name + } else { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "either tableARN or (tableBucketARN, namespace, name) is required") + return fmt.Errorf("missing required parameters") + } + + tablePath := getTablePath(bucketName, namespace, tableName) + + var metadata tableMetadataInternal + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + data, err := h.getExtendedAttribute(client, tablePath, ExtendedKeyMetadata) + if err != nil { + return err + } + return json.Unmarshal(data, &metadata) + }) + + if err != nil { + h.writeError(w, http.StatusNotFound, ErrCodeNoSuchTable, fmt.Sprintf("table %s not found", tableName)) + return err + } + + tableARN := h.generateTableARN(bucketName, namespace, tableName) + + resp := &GetTableResponse{ + Name: metadata.Name, + TableARN: tableARN, + Namespace: []string{metadata.Namespace}, + Format: metadata.Format, + CreatedAt: metadata.CreatedAt, + ModifiedAt: metadata.ModifiedAt, + OwnerAccountID: metadata.OwnerID, + MetadataLocation: metadata.MetadataLocation, + VersionToken: metadata.VersionToken, + } + + h.writeJSON(w, http.StatusOK, resp) + return nil +} + +// handleListTables lists all tables in a namespace or bucket +func (h *S3TablesHandler) handleListTables(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + var req ListTablesRequest + if err := h.readRequestBody(r, &req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + if req.TableBucketARN == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required") + return fmt.Errorf("tableBucketARN is required") + } + + bucketName, err := parseBucketNameFromARN(req.TableBucketARN) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + maxTables := req.MaxTables + if maxTables <= 0 { + maxTables = 100 + } + + var tables []TableSummary + + // If namespace is specified, list tables in that namespace only + if req.Namespace != "" { + err = h.listTablesInNamespace(filerClient, bucketName, req.Namespace, req.Prefix, maxTables, &tables) + } else { + // List tables in all namespaces + err = h.listTablesInAllNamespaces(filerClient, bucketName, req.Prefix, maxTables, &tables) + } + + if err != nil { + tables = []TableSummary{} + } + + resp := &ListTablesResponse{ + Tables: tables, + } + + h.writeJSON(w, http.StatusOK, resp) + return nil +} + +func (h *S3TablesHandler) listTablesInNamespace(filerClient FilerClient, bucketName, namespace, prefix string, maxTables int, tables *[]TableSummary) error { + namespacePath := getNamespacePath(bucketName, namespace) + + return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{ + Directory: namespacePath, + Limit: uint32(maxTables), + }) + if err != nil { + return err + } + + for { + entry, err := resp.Recv() + if err != nil { + break + } + + if entry.Entry == nil || !entry.Entry.IsDirectory { + continue + } + + // Skip hidden entries + if strings.HasPrefix(entry.Entry.Name, ".") { + continue + } + + // Apply prefix filter + if prefix != "" && !strings.HasPrefix(entry.Entry.Name, prefix) { + continue + } + + // Read table metadata from extended attribute + data, ok := entry.Entry.Extended[ExtendedKeyMetadata] + if !ok { + continue + } + + var metadata tableMetadataInternal + if err := json.Unmarshal(data, &metadata); err != nil { + continue + } + + tableARN := h.generateTableARN(bucketName, namespace, entry.Entry.Name) + + *tables = append(*tables, TableSummary{ + Name: metadata.Name, + TableARN: tableARN, + Namespace: []string{namespace}, + CreatedAt: metadata.CreatedAt, + ModifiedAt: metadata.ModifiedAt, + }) + } + + return nil + }) +} + +func (h *S3TablesHandler) listTablesInAllNamespaces(filerClient FilerClient, bucketName, prefix string, maxTables int, tables *[]TableSummary) error { + bucketPath := getTableBucketPath(bucketName) + + return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + // List all namespaces first + resp, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{ + Directory: bucketPath, + Limit: 1000, + }) + if err != nil { + return err + } + + for { + entry, err := resp.Recv() + if err != nil { + break + } + + if entry.Entry == nil || !entry.Entry.IsDirectory { + continue + } + + // Skip hidden entries + if strings.HasPrefix(entry.Entry.Name, ".") { + continue + } + + namespace := entry.Entry.Name + + // List tables in this namespace + if err := h.listTablesInNamespace(filerClient, bucketName, namespace, prefix, maxTables-len(*tables), tables); err != nil { + continue + } + + if len(*tables) >= maxTables { + break + } + } + + return nil + }) +} + +// handleDeleteTable deletes a table from a namespace +func (h *S3TablesHandler) handleDeleteTable(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + var req DeleteTableRequest + if err := h.readRequestBody(r, &req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + if req.TableBucketARN == "" || req.Namespace == "" || req.Name == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required") + return fmt.Errorf("missing required parameters") + } + + bucketName, err := parseBucketNameFromARN(req.TableBucketARN) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + tablePath := getTablePath(bucketName, req.Namespace, req.Name) + + // Check if table exists + var tableExists bool + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + _, err := h.getExtendedAttribute(client, tablePath, ExtendedKeyMetadata) + tableExists = err == nil + return nil + }) + + if !tableExists { + h.writeError(w, http.StatusNotFound, ErrCodeNoSuchTable, fmt.Sprintf("table %s not found", req.Name)) + return fmt.Errorf("table not found") + } + + // Delete the table + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return h.deleteDirectory(client, tablePath) + }) + + if err != nil { + h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to delete table") + return err + } + + h.writeJSON(w, http.StatusOK, nil) + return nil +} diff --git a/weed/s3api/s3tables/types.go b/weed/s3api/s3tables/types.go new file mode 100644 index 000000000..281ee73fe --- /dev/null +++ b/weed/s3api/s3tables/types.go @@ -0,0 +1,290 @@ +package s3tables + +import "time" + +// Table bucket types + +type TableBucket struct { + ARN string `json:"arn"` + Name string `json:"name"` + OwnerID string `json:"ownerAccountId"` + CreatedAt time.Time `json:"createdAt"` +} + +type CreateTableBucketRequest struct { + Name string `json:"name"` + Tags map[string]string `json:"tags,omitempty"` +} + +type CreateTableBucketResponse struct { + ARN string `json:"arn"` +} + +type GetTableBucketRequest struct { + TableBucketARN string `json:"tableBucketARN"` +} + +type GetTableBucketResponse struct { + ARN string `json:"arn"` + Name string `json:"name"` + OwnerAccountID string `json:"ownerAccountId"` + CreatedAt time.Time `json:"createdAt"` +} + +type ListTableBucketsRequest struct { + Prefix string `json:"prefix,omitempty"` + ContinuationToken string `json:"continuationToken,omitempty"` + MaxBuckets int `json:"maxBuckets,omitempty"` +} + +type TableBucketSummary struct { + ARN string `json:"arn"` + Name string `json:"name"` + CreatedAt time.Time `json:"createdAt"` +} + +type ListTableBucketsResponse struct { + TableBuckets []TableBucketSummary `json:"tableBuckets"` + ContinuationToken string `json:"continuationToken,omitempty"` +} + +type DeleteTableBucketRequest struct { + TableBucketARN string `json:"tableBucketARN"` +} + +// Table bucket policy types + +type PutTableBucketPolicyRequest struct { + TableBucketARN string `json:"tableBucketARN"` + ResourcePolicy string `json:"resourcePolicy"` +} + +type GetTableBucketPolicyRequest struct { + TableBucketARN string `json:"tableBucketARN"` +} + +type GetTableBucketPolicyResponse struct { + ResourcePolicy string `json:"resourcePolicy"` +} + +type DeleteTableBucketPolicyRequest struct { + TableBucketARN string `json:"tableBucketARN"` +} + +// Namespace types + +type Namespace struct { + Namespace []string `json:"namespace"` + CreatedAt time.Time `json:"createdAt"` + OwnerAccountID string `json:"ownerAccountId"` +} + +type CreateNamespaceRequest struct { + TableBucketARN string `json:"tableBucketARN"` + Namespace []string `json:"namespace"` +} + +type CreateNamespaceResponse struct { + Namespace []string `json:"namespace"` + TableBucketARN string `json:"tableBucketARN"` +} + +type GetNamespaceRequest struct { + TableBucketARN string `json:"tableBucketARN"` + Namespace string `json:"namespace"` +} + +type GetNamespaceResponse struct { + Namespace []string `json:"namespace"` + CreatedAt time.Time `json:"createdAt"` + OwnerAccountID string `json:"ownerAccountId"` +} + +type ListNamespacesRequest struct { + TableBucketARN string `json:"tableBucketARN"` + Prefix string `json:"prefix,omitempty"` + ContinuationToken string `json:"continuationToken,omitempty"` + MaxNamespaces int `json:"maxNamespaces,omitempty"` +} + +type NamespaceSummary struct { + Namespace []string `json:"namespace"` + CreatedAt time.Time `json:"createdAt"` +} + +type ListNamespacesResponse struct { + Namespaces []NamespaceSummary `json:"namespaces"` + ContinuationToken string `json:"continuationToken,omitempty"` +} + +type DeleteNamespaceRequest struct { + TableBucketARN string `json:"tableBucketARN"` + Namespace string `json:"namespace"` +} + +// Table types + +type IcebergSchemaField struct { + Name string `json:"name"` + Type string `json:"type"` + Required bool `json:"required,omitempty"` +} + +type IcebergSchema struct { + Fields []IcebergSchemaField `json:"fields"` +} + +type IcebergMetadata struct { + Schema IcebergSchema `json:"schema"` +} + +type TableMetadata struct { + Iceberg *IcebergMetadata `json:"iceberg,omitempty"` +} + +type Table struct { + Name string `json:"name"` + TableARN string `json:"tableARN"` + Namespace []string `json:"namespace"` + Format string `json:"format"` + CreatedAt time.Time `json:"createdAt"` + ModifiedAt time.Time `json:"modifiedAt"` + OwnerAccountID string `json:"ownerAccountId"` + MetadataLocation string `json:"metadataLocation,omitempty"` + Metadata *TableMetadata `json:"metadata,omitempty"` +} + +type CreateTableRequest struct { + TableBucketARN string `json:"tableBucketARN"` + Namespace string `json:"namespace"` + Name string `json:"name"` + Format string `json:"format"` + Metadata *TableMetadata `json:"metadata,omitempty"` + Tags map[string]string `json:"tags,omitempty"` +} + +type CreateTableResponse struct { + TableARN string `json:"tableARN"` + VersionToken string `json:"versionToken"` + MetadataLocation string `json:"metadataLocation,omitempty"` +} + +type GetTableRequest struct { + TableBucketARN string `json:"tableBucketARN,omitempty"` + Namespace string `json:"namespace,omitempty"` + Name string `json:"name,omitempty"` + TableARN string `json:"tableARN,omitempty"` +} + +type GetTableResponse struct { + Name string `json:"name"` + TableARN string `json:"tableARN"` + Namespace []string `json:"namespace"` + Format string `json:"format"` + CreatedAt time.Time `json:"createdAt"` + ModifiedAt time.Time `json:"modifiedAt"` + OwnerAccountID string `json:"ownerAccountId"` + MetadataLocation string `json:"metadataLocation,omitempty"` + VersionToken string `json:"versionToken"` +} + +type ListTablesRequest struct { + TableBucketARN string `json:"tableBucketARN"` + Namespace string `json:"namespace,omitempty"` + Prefix string `json:"prefix,omitempty"` + ContinuationToken string `json:"continuationToken,omitempty"` + MaxTables int `json:"maxTables,omitempty"` +} + +type TableSummary struct { + Name string `json:"name"` + TableARN string `json:"tableARN"` + Namespace []string `json:"namespace"` + CreatedAt time.Time `json:"createdAt"` + ModifiedAt time.Time `json:"modifiedAt"` + MetadataLocation string `json:"metadataLocation,omitempty"` +} + +type ListTablesResponse struct { + Tables []TableSummary `json:"tables"` + ContinuationToken string `json:"continuationToken,omitempty"` +} + +type DeleteTableRequest struct { + TableBucketARN string `json:"tableBucketARN"` + Namespace string `json:"namespace"` + Name string `json:"name"` + VersionToken string `json:"versionToken,omitempty"` +} + +// Table policy types + +type PutTablePolicyRequest struct { + TableBucketARN string `json:"tableBucketARN"` + Namespace string `json:"namespace"` + Name string `json:"name"` + ResourcePolicy string `json:"resourcePolicy"` +} + +type GetTablePolicyRequest struct { + TableBucketARN string `json:"tableBucketARN"` + Namespace string `json:"namespace"` + Name string `json:"name"` +} + +type GetTablePolicyResponse struct { + ResourcePolicy string `json:"resourcePolicy"` +} + +type DeleteTablePolicyRequest struct { + TableBucketARN string `json:"tableBucketARN"` + Namespace string `json:"namespace"` + Name string `json:"name"` +} + +// Tagging types + +type TagResourceRequest struct { + ResourceARN string `json:"resourceArn"` + Tags map[string]string `json:"tags"` +} + +type ListTagsForResourceRequest struct { + ResourceARN string `json:"resourceArn"` +} + +type ListTagsForResourceResponse struct { + Tags map[string]string `json:"tags"` +} + +type UntagResourceRequest struct { + ResourceARN string `json:"resourceArn"` + TagKeys []string `json:"tagKeys"` +} + +// Error types + +type S3TablesError struct { + Type string `json:"__type"` + Message string `json:"message"` +} + +func (e *S3TablesError) Error() string { + return e.Message +} + +// Error codes +const ( + ErrCodeBucketAlreadyExists = "BucketAlreadyExists" + ErrCodeBucketNotEmpty = "BucketNotEmpty" + ErrCodeNoSuchBucket = "NoSuchBucket" + ErrCodeNoSuchNamespace = "NoSuchNamespace" + ErrCodeNoSuchTable = "NoSuchTable" + ErrCodeNamespaceAlreadyExists = "NamespaceAlreadyExists" + ErrCodeNamespaceNotEmpty = "NamespaceNotEmpty" + ErrCodeTableAlreadyExists = "TableAlreadyExists" + ErrCodeAccessDenied = "AccessDenied" + ErrCodeInvalidRequest = "InvalidRequest" + ErrCodeInternalError = "InternalError" + ErrCodeNoSuchPolicy = "NoSuchPolicy" +)