Browse Source
s3tables: split table bucket operations into focused modules
s3tables: split table bucket operations into focused modules
- Create bucket_create.go for CreateTableBucket operation - Create bucket_get_list_delete.go for Get, List, Delete operations - Related operations grouped for better maintainability - Each file has a single, clear responsibility - Improves code clarity and makes it easier to testpull/8147/head
2 changed files with 297 additions and 0 deletions
@ -0,0 +1,97 @@ |
|||
package s3tables |
|||
|
|||
import ( |
|||
"context" |
|||
"encoding/json" |
|||
"fmt" |
|||
"net/http" |
|||
"time" |
|||
|
|||
"github.com/seaweedfs/seaweedfs/weed/glog" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|||
) |
|||
|
|||
// handleCreateTableBucket creates a new table bucket
|
|||
func (h *S3TablesHandler) handleCreateTableBucket(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { |
|||
var req CreateTableBucketRequest |
|||
if err := h.readRequestBody(r, &req); err != nil { |
|||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) |
|||
return err |
|||
} |
|||
|
|||
if req.Name == "" { |
|||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "name is required") |
|||
return fmt.Errorf("name is required") |
|||
} |
|||
|
|||
// Validate bucket name
|
|||
if len(req.Name) < 3 || len(req.Name) > 63 { |
|||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "bucket name must be between 3 and 63 characters") |
|||
return fmt.Errorf("invalid bucket name length") |
|||
} |
|||
|
|||
bucketPath := getTableBucketPath(req.Name) |
|||
|
|||
// Check if bucket already exists
|
|||
exists := false |
|||
err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { |
|||
resp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ |
|||
Directory: TablesPath, |
|||
Name: req.Name, |
|||
}) |
|||
if err == nil && resp.Entry != nil { |
|||
exists = true |
|||
} |
|||
return nil |
|||
}) |
|||
|
|||
if exists { |
|||
h.writeError(w, http.StatusConflict, ErrCodeBucketAlreadyExists, fmt.Sprintf("table bucket %s already exists", req.Name)) |
|||
return fmt.Errorf("bucket already exists") |
|||
} |
|||
|
|||
// Create the bucket directory and set metadata as extended attributes
|
|||
now := time.Now() |
|||
metadata := &tableBucketMetadata{ |
|||
Name: req.Name, |
|||
CreatedAt: now, |
|||
OwnerID: h.accountID, |
|||
} |
|||
|
|||
metadataBytes, _ := json.Marshal(metadata) |
|||
|
|||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { |
|||
// Create bucket directory
|
|||
if err := h.createDirectory(client, bucketPath); err != nil { |
|||
return err |
|||
} |
|||
|
|||
// Set metadata as extended attribute
|
|||
if err := h.setExtendedAttribute(client, bucketPath, ExtendedKeyMetadata, metadataBytes); err != nil { |
|||
return err |
|||
} |
|||
|
|||
// Set tags if provided
|
|||
if len(req.Tags) > 0 { |
|||
tagsBytes, _ := json.Marshal(req.Tags) |
|||
if err := h.setExtendedAttribute(client, bucketPath, ExtendedKeyTags, tagsBytes); err != nil { |
|||
return err |
|||
} |
|||
} |
|||
|
|||
return nil |
|||
}) |
|||
|
|||
if err != nil { |
|||
glog.Errorf("S3Tables: failed to create table bucket %s: %v", req.Name, err) |
|||
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to create table bucket") |
|||
return err |
|||
} |
|||
|
|||
resp := &CreateTableBucketResponse{ |
|||
ARN: h.generateTableBucketARN(req.Name), |
|||
} |
|||
|
|||
h.writeJSON(w, http.StatusOK, resp) |
|||
return nil |
|||
} |
|||
@ -0,0 +1,200 @@ |
|||
package s3tables |
|||
|
|||
import ( |
|||
"context" |
|||
"encoding/json" |
|||
"fmt" |
|||
"net/http" |
|||
"strings" |
|||
|
|||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|||
) |
|||
|
|||
// handleGetTableBucket gets details of a table bucket
|
|||
func (h *S3TablesHandler) handleGetTableBucket(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { |
|||
var req GetTableBucketRequest |
|||
if err := h.readRequestBody(r, &req); err != nil { |
|||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) |
|||
return err |
|||
} |
|||
|
|||
if req.TableBucketARN == "" { |
|||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required") |
|||
return fmt.Errorf("tableBucketARN is required") |
|||
} |
|||
|
|||
bucketName, err := parseBucketNameFromARN(req.TableBucketARN) |
|||
if err != nil { |
|||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) |
|||
return err |
|||
} |
|||
|
|||
bucketPath := getTableBucketPath(bucketName) |
|||
|
|||
var metadata tableBucketMetadata |
|||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { |
|||
data, err := h.getExtendedAttribute(client, bucketPath, ExtendedKeyMetadata) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
return json.Unmarshal(data, &metadata) |
|||
}) |
|||
|
|||
if err != nil { |
|||
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchBucket, fmt.Sprintf("table bucket %s not found", bucketName)) |
|||
return err |
|||
} |
|||
|
|||
resp := &GetTableBucketResponse{ |
|||
ARN: h.generateTableBucketARN(bucketName), |
|||
Name: metadata.Name, |
|||
OwnerAccountID: metadata.OwnerID, |
|||
CreatedAt: metadata.CreatedAt, |
|||
} |
|||
|
|||
h.writeJSON(w, http.StatusOK, resp) |
|||
return nil |
|||
} |
|||
|
|||
// handleListTableBuckets lists all table buckets
|
|||
func (h *S3TablesHandler) handleListTableBuckets(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { |
|||
var req ListTableBucketsRequest |
|||
if err := h.readRequestBody(r, &req); err != nil { |
|||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) |
|||
return err |
|||
} |
|||
|
|||
maxBuckets := req.MaxBuckets |
|||
if maxBuckets <= 0 { |
|||
maxBuckets = 100 |
|||
} |
|||
|
|||
var buckets []TableBucketSummary |
|||
|
|||
err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { |
|||
resp, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{ |
|||
Directory: TablesPath, |
|||
Limit: uint32(maxBuckets), |
|||
}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
for { |
|||
entry, err := resp.Recv() |
|||
if err != nil { |
|||
break |
|||
} |
|||
|
|||
if entry.Entry == nil || !entry.Entry.IsDirectory { |
|||
continue |
|||
} |
|||
|
|||
// Skip entries starting with "."
|
|||
if strings.HasPrefix(entry.Entry.Name, ".") { |
|||
continue |
|||
} |
|||
|
|||
// Apply prefix filter
|
|||
if req.Prefix != "" && !strings.HasPrefix(entry.Entry.Name, req.Prefix) { |
|||
continue |
|||
} |
|||
|
|||
// Read metadata from extended attribute
|
|||
data, ok := entry.Entry.Extended[ExtendedKeyMetadata] |
|||
if !ok { |
|||
continue |
|||
} |
|||
|
|||
var metadata tableBucketMetadata |
|||
if err := json.Unmarshal(data, &metadata); err != nil { |
|||
continue |
|||
} |
|||
|
|||
buckets = append(buckets, TableBucketSummary{ |
|||
ARN: h.generateTableBucketARN(entry.Entry.Name), |
|||
Name: entry.Entry.Name, |
|||
CreatedAt: metadata.CreatedAt, |
|||
}) |
|||
} |
|||
|
|||
return nil |
|||
}) |
|||
|
|||
if err != nil { |
|||
// If the directory doesn't exist, return empty list
|
|||
buckets = []TableBucketSummary{} |
|||
} |
|||
|
|||
resp := &ListTableBucketsResponse{ |
|||
TableBuckets: buckets, |
|||
} |
|||
|
|||
h.writeJSON(w, http.StatusOK, resp) |
|||
return nil |
|||
} |
|||
|
|||
// handleDeleteTableBucket deletes a table bucket
|
|||
func (h *S3TablesHandler) handleDeleteTableBucket(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { |
|||
var req DeleteTableBucketRequest |
|||
if err := h.readRequestBody(r, &req); err != nil { |
|||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) |
|||
return err |
|||
} |
|||
|
|||
if req.TableBucketARN == "" { |
|||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required") |
|||
return fmt.Errorf("tableBucketARN is required") |
|||
} |
|||
|
|||
bucketName, err := parseBucketNameFromARN(req.TableBucketARN) |
|||
if err != nil { |
|||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) |
|||
return err |
|||
} |
|||
|
|||
bucketPath := getTableBucketPath(bucketName) |
|||
|
|||
// Check if bucket exists and is empty
|
|||
hasChildren := false |
|||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { |
|||
resp, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{ |
|||
Directory: bucketPath, |
|||
Limit: 10, |
|||
}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
for { |
|||
entry, err := resp.Recv() |
|||
if err != nil { |
|||
break |
|||
} |
|||
if entry.Entry != nil && !strings.HasPrefix(entry.Entry.Name, ".") { |
|||
hasChildren = true |
|||
break |
|||
} |
|||
} |
|||
|
|||
return nil |
|||
}) |
|||
|
|||
if hasChildren { |
|||
h.writeError(w, http.StatusConflict, ErrCodeBucketNotEmpty, "table bucket is not empty") |
|||
return fmt.Errorf("bucket not empty") |
|||
} |
|||
|
|||
// Delete the bucket
|
|||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { |
|||
return h.deleteDirectory(client, bucketPath) |
|||
}) |
|||
|
|||
if err != nil { |
|||
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to delete table bucket") |
|||
return err |
|||
} |
|||
|
|||
h.writeJSON(w, http.StatusOK, nil) |
|||
return nil |
|||
} |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue