From 09bb90e8dc1647b30c62fceabca0172b38876af3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 28 Jan 2026 00:55:17 -0800 Subject: [PATCH] s3tables: split table bucket operations into focused modules - Create bucket_create.go for CreateTableBucket operation - Create bucket_get_list_delete.go for Get, List, Delete operations - Related operations grouped for better maintainability - Each file has a single, clear responsibility - Improves code clarity and makes it easier to test --- weed/s3api/s3tables/bucket_create.go | 97 +++++++++ weed/s3api/s3tables/bucket_get_list_delete.go | 200 ++++++++++++++++++ 2 files changed, 297 insertions(+) create mode 100644 weed/s3api/s3tables/bucket_create.go create mode 100644 weed/s3api/s3tables/bucket_get_list_delete.go diff --git a/weed/s3api/s3tables/bucket_create.go b/weed/s3api/s3tables/bucket_create.go new file mode 100644 index 000000000..2a27082a7 --- /dev/null +++ b/weed/s3api/s3tables/bucket_create.go @@ -0,0 +1,97 @@ +package s3tables + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +// handleCreateTableBucket creates a new table bucket +func (h *S3TablesHandler) handleCreateTableBucket(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + var req CreateTableBucketRequest + if err := h.readRequestBody(r, &req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + if req.Name == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "name is required") + return fmt.Errorf("name is required") + } + + // Validate bucket name + if len(req.Name) < 3 || len(req.Name) > 63 { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "bucket name must be between 3 and 63 characters") + return fmt.Errorf("invalid bucket name length") + } + + bucketPath := getTableBucketPath(req.Name) + + // Check if bucket already exists + exists := false + err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ + Directory: TablesPath, + Name: req.Name, + }) + if err == nil && resp.Entry != nil { + exists = true + } + return nil + }) + + if exists { + h.writeError(w, http.StatusConflict, ErrCodeBucketAlreadyExists, fmt.Sprintf("table bucket %s already exists", req.Name)) + return fmt.Errorf("bucket already exists") + } + + // Create the bucket directory and set metadata as extended attributes + now := time.Now() + metadata := &tableBucketMetadata{ + Name: req.Name, + CreatedAt: now, + OwnerID: h.accountID, + } + + metadataBytes, _ := json.Marshal(metadata) + + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + // Create bucket directory + if err := h.createDirectory(client, bucketPath); err != nil { + return err + } + + // Set metadata as extended attribute + if err := h.setExtendedAttribute(client, bucketPath, ExtendedKeyMetadata, metadataBytes); err != nil { + return err + } + + // Set tags if provided + if len(req.Tags) > 0 { + tagsBytes, _ := json.Marshal(req.Tags) + if err := h.setExtendedAttribute(client, bucketPath, ExtendedKeyTags, tagsBytes); err != nil { + return err + } + } + + return nil + }) + + if err != nil { + glog.Errorf("S3Tables: failed to create table bucket %s: %v", req.Name, err) + h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to create table bucket") + return err + } + + resp := &CreateTableBucketResponse{ + ARN: h.generateTableBucketARN(req.Name), + } + + h.writeJSON(w, http.StatusOK, resp) + return nil +} diff --git a/weed/s3api/s3tables/bucket_get_list_delete.go b/weed/s3api/s3tables/bucket_get_list_delete.go new file mode 100644 index 000000000..12775c3e5 --- /dev/null +++ b/weed/s3api/s3tables/bucket_get_list_delete.go @@ -0,0 +1,200 @@ +package s3tables + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +// handleGetTableBucket gets details of a table bucket +func (h *S3TablesHandler) handleGetTableBucket(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + var req GetTableBucketRequest + if err := h.readRequestBody(r, &req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + if req.TableBucketARN == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required") + return fmt.Errorf("tableBucketARN is required") + } + + bucketName, err := parseBucketNameFromARN(req.TableBucketARN) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + bucketPath := getTableBucketPath(bucketName) + + var metadata tableBucketMetadata + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + data, err := h.getExtendedAttribute(client, bucketPath, ExtendedKeyMetadata) + if err != nil { + return err + } + return json.Unmarshal(data, &metadata) + }) + + if err != nil { + h.writeError(w, http.StatusNotFound, ErrCodeNoSuchBucket, fmt.Sprintf("table bucket %s not found", bucketName)) + return err + } + + resp := &GetTableBucketResponse{ + ARN: h.generateTableBucketARN(bucketName), + Name: metadata.Name, + OwnerAccountID: metadata.OwnerID, + CreatedAt: metadata.CreatedAt, + } + + h.writeJSON(w, http.StatusOK, resp) + return nil +} + +// handleListTableBuckets lists all table buckets +func (h *S3TablesHandler) handleListTableBuckets(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + var req ListTableBucketsRequest + if err := h.readRequestBody(r, &req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + maxBuckets := req.MaxBuckets + if maxBuckets <= 0 { + maxBuckets = 100 + } + + var buckets []TableBucketSummary + + err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{ + Directory: TablesPath, + Limit: uint32(maxBuckets), + }) + if err != nil { + return err + } + + for { + entry, err := resp.Recv() + if err != nil { + break + } + + if entry.Entry == nil || !entry.Entry.IsDirectory { + continue + } + + // Skip entries starting with "." + if strings.HasPrefix(entry.Entry.Name, ".") { + continue + } + + // Apply prefix filter + if req.Prefix != "" && !strings.HasPrefix(entry.Entry.Name, req.Prefix) { + continue + } + + // Read metadata from extended attribute + data, ok := entry.Entry.Extended[ExtendedKeyMetadata] + if !ok { + continue + } + + var metadata tableBucketMetadata + if err := json.Unmarshal(data, &metadata); err != nil { + continue + } + + buckets = append(buckets, TableBucketSummary{ + ARN: h.generateTableBucketARN(entry.Entry.Name), + Name: entry.Entry.Name, + CreatedAt: metadata.CreatedAt, + }) + } + + return nil + }) + + if err != nil { + // If the directory doesn't exist, return empty list + buckets = []TableBucketSummary{} + } + + resp := &ListTableBucketsResponse{ + TableBuckets: buckets, + } + + h.writeJSON(w, http.StatusOK, resp) + return nil +} + +// handleDeleteTableBucket deletes a table bucket +func (h *S3TablesHandler) handleDeleteTableBucket(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + var req DeleteTableBucketRequest + if err := h.readRequestBody(r, &req); err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + if req.TableBucketARN == "" { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required") + return fmt.Errorf("tableBucketARN is required") + } + + bucketName, err := parseBucketNameFromARN(req.TableBucketARN) + if err != nil { + h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) + return err + } + + bucketPath := getTableBucketPath(bucketName) + + // Check if bucket exists and is empty + hasChildren := false + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{ + Directory: bucketPath, + Limit: 10, + }) + if err != nil { + return err + } + + for { + entry, err := resp.Recv() + if err != nil { + break + } + if entry.Entry != nil && !strings.HasPrefix(entry.Entry.Name, ".") { + hasChildren = true + break + } + } + + return nil + }) + + if hasChildren { + h.writeError(w, http.StatusConflict, ErrCodeBucketNotEmpty, "table bucket is not empty") + return fmt.Errorf("bucket not empty") + } + + // Delete the bucket + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return h.deleteDirectory(client, bucketPath) + }) + + if err != nil { + h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to delete table bucket") + return err + } + + h.writeJSON(w, http.StatusOK, nil) + return nil +}