diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index 9f658aaaa..d7f5aa7b8 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -624,17 +624,6 @@ func (s3a *S3ApiServer) AuthWithPublicRead(handler http.HandlerFunc, action Acti glog.V(4).Infof("AuthWithPublicRead: bucket=%s, object=%s, authType=%v, isAnonymous=%v", bucket, object, authType, isAnonymous) - // Allow anonymous access for SOSAPI virtual objects (discovery) - if isSOSAPIObject(object) { - // Ensure the bucket exists anyway - _, errCode := s3a.getBucketConfig(bucket) - if errCode == s3err.ErrNone { - glog.V(3).Infof("AuthWithPublicRead: allowing anonymous access to SOSAPI object %s in bucket %s", object, bucket) - handler(w, r) - return - } - } - // For anonymous requests, check if bucket allows public read via ACLs or bucket policies if isAnonymous { // First check ACL-based public access diff --git a/weed/s3api/s3api_sosapi.go b/weed/s3api/s3api_sosapi.go index e8edd9cb3..f12add728 100644 --- a/weed/s3api/s3api_sosapi.go +++ b/weed/s3api/s3api_sosapi.go @@ -5,11 +5,13 @@ package s3api import ( + "bytes" "context" "crypto/md5" "encoding/hex" "encoding/xml" - "io" + "errors" + "fmt" "net/http" "strconv" "strings" @@ -17,6 +19,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/util/version" @@ -123,18 +126,13 @@ func generateSystemXML() ([]byte, error) { } // generateCapacityXML creates the capacity.xml response containing real-time -// storage capacity information retrieved from the master server. -func (s3a *S3ApiServer) generateCapacityXML(ctx context.Context) ([]byte, error) { - total, used, err := s3a.getClusterCapacity(ctx) +// storage capacity information. +func (s3a *S3ApiServer) generateCapacityXML(ctx context.Context, bucket string) ([]byte, error) { + total, available, used, err := s3a.getCapacityInfo(ctx, bucket) if err != nil { - glog.Warningf("SOSAPI: failed to get cluster capacity: %v, using defaults", err) - // Return zero capacity on error - clients will handle gracefully - total, used = 0, 0 - } - - available := total - used - if available < 0 { - available = 0 + glog.Warningf("SOSAPI: failed to get capacity info for bucket %s: %v, using defaults", bucket, err) + // Return zero capacity on error + total, available, used = 0, 0, 0 } ci := CapacityInfo{ @@ -146,26 +144,98 @@ func (s3a *S3ApiServer) generateCapacityXML(ctx context.Context) ([]byte, error) return xml.Marshal(&ci) } -// getClusterCapacity retrieves the total and used storage capacity from the master server. -func (s3a *S3ApiServer) getClusterCapacity(ctx context.Context) (total, used int64, err error) { - // Get the current filer address, then use it to connect to master - filerAddress := s3a.getFilerAddress() - if filerAddress == "" { - return 0, 0, nil +// getCapacityInfo retrieves capacity information for the specific bucket. +// It checks bucket quota first, then falls back to cluster topology information. +// Returns capacity, available, and used bytes. +func (s3a *S3ApiServer) getCapacityInfo(ctx context.Context, bucket string) (capacity, available, used int64, err error) { + // 1. Check if bucket has a quota + // We use s3a.getEntry which is a helper in s3api_bucket_handlers.go + var quota int64 + // getEntry communicates with filer, so errors here might mean filer connectivity issues or bucket not found + // If bucket not found, we probably shouldn't be here (checked in handler), but safe to ignore + if entry, getErr := s3a.getEntry(s3a.option.BucketsPath, bucket); getErr == nil && entry != nil { + quota = entry.Quota + } + + // 2. Get cluster topology from master + if len(s3a.option.Masters) == 0 { + return 0, 0, 0, fmt.Errorf("no master servers configured") + } + + masterMap := make(map[string]pb.ServerAddress) + for _, master := range s3a.option.Masters { + masterMap[string(master)] = master } - // Use the filer client to get master information and call statistics - err = pb.WithMasterClient(false, filerAddress, s3a.option.GrpcDialOption, false, func(client master_pb.SeaweedClient) error { - resp, statsErr := client.Statistics(ctx, &master_pb.StatisticsRequest{}) - if statsErr != nil { - return statsErr + // Connect to any available master and get volume list (topology) + err = pb.WithOneOfGrpcMasterClients(false, masterMap, s3a.option.GrpcDialOption, func(client master_pb.SeaweedClient) error { + resp, vErr := client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + if vErr != nil { + return vErr + } + + if resp.TopologyInfo == nil { + return nil + } + + // Calculate used size for the bucket by summing up volumes in the collection + used = collectBucketUsageFromTopology(resp.TopologyInfo, s3a.getCollectionName(bucket)) + + // Calculate cluster capacity if no quota + if quota > 0 { + capacity = quota + available = quota - used + if available < 0 { + available = 0 + } + } else { + // No quota - use cluster capacity + clusterTotal, clusterAvailable := calculateClusterCapacity(resp.TopologyInfo, resp.VolumeSizeLimitMb) + capacity = clusterTotal + available = clusterAvailable } - total = int64(resp.TotalSize) - used = int64(resp.UsedSize) return nil }) - return total, used, err + return capacity, available, used, err +} + +// collectBucketUsageFromTopology sums up the size of all volumes belonging to the specified collection. +func collectBucketUsageFromTopology(t *master_pb.TopologyInfo, collectionName string) (used int64) { + seenVolumes := make(map[uint32]bool) + for _, dc := range t.DataCenterInfos { + for _, r := range dc.RackInfos { + for _, dn := range r.DataNodeInfos { + for _, disk := range dn.DiskInfos { + for _, vi := range disk.VolumeInfos { + if vi.Collection == collectionName { + if !seenVolumes[vi.Id] { + used += int64(vi.Size) + seenVolumes[vi.Id] = true + } + } + } + } + } + } + } + return +} + +// calculateClusterCapacity sums up the total and available capacity of the entire cluster. +func calculateClusterCapacity(t *master_pb.TopologyInfo, volumeSizeLimitMb uint64) (total, available int64) { + volumeSize := int64(volumeSizeLimitMb) * 1024 * 1024 + for _, dc := range t.DataCenterInfos { + for _, r := range dc.RackInfos { + for _, dn := range r.DataNodeInfos { + for _, disk := range dn.DiskInfos { + total += int64(disk.MaxVolumeCount) * volumeSize + available += int64(disk.FreeVolumeCount) * volumeSize + } + } + } + } + return } // handleSOSAPIGetObject handles GET requests for SOSAPI virtual objects. @@ -175,62 +245,27 @@ func (s3a *S3ApiServer) handleSOSAPIGetObject(w http.ResponseWriter, r *http.Req return false } - var xmlData []byte - var err error - - // Verify bucket exists - if _, errCode := s3a.getBucketConfig(bucket); errCode != s3err.ErrNone { - s3err.WriteErrorResponse(w, r, errCode) - return true - } - - switch object { - case sosAPISystemXML: - xmlData, err = generateSystemXML() - if err != nil { - glog.Errorf("SOSAPI: failed to generate system.xml: %v", err) - http.Error(w, "Internal Server Error", http.StatusInternalServerError) - return true - } - glog.V(2).Infof("SOSAPI: serving system.xml for bucket %s", bucket) - - case sosAPICapacityXML: - xmlData, err = s3a.generateCapacityXML(r.Context()) - if err != nil { - glog.Errorf("SOSAPI: failed to generate capacity.xml: %v", err) - http.Error(w, "Internal Server Error", http.StatusInternalServerError) - return true + xmlData, err := s3a.generateSOSAPIContent(r.Context(), bucket, object) + if err != nil { + if errors.Is(err, filer_pb.ErrNotFound) { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) + } else { + glog.Errorf("SOSAPI: failed to generate %s: %v", object, err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) } - glog.V(2).Infof("SOSAPI: serving capacity.xml for bucket %s", bucket) - - default: - return false + return true } - // Prepend XML declaration - xmlData = append([]byte(xml.Header), xmlData...) - // Calculate ETag from content hash := md5.Sum(xmlData) etag := hex.EncodeToString(hash[:]) - // Set response headers - w.Header().Set("Content-Type", "application/xml") + // Set ETag header manually as ServeContent doesn't calculate it automatically w.Header().Set("ETag", "\""+etag+"\"") - w.Header().Set("Content-Length", strconv.Itoa(len(xmlData))) - w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat)) - - // Handle Range requests if present - rangeHeader := r.Header.Get("Range") - if rangeHeader != "" { - // Simple range handling for SOSAPI objects - s3a.serveSOSAPIRange(w, r, xmlData, etag) - return true - } + w.Header().Set("Content-Type", "application/xml") - // Write full response - w.WriteHeader(http.StatusOK) - w.Write(xmlData) + // Use http.ServeContent to handle Content-Length, Range, and Last-Modified + http.ServeContent(w, r, object, time.Now().UTC(), bytes.NewReader(xmlData)) return true } @@ -242,41 +277,17 @@ func (s3a *S3ApiServer) handleSOSAPIHeadObject(w http.ResponseWriter, r *http.Re return false } - var xmlData []byte - var err error - - // Verify bucket exists - if _, errCode := s3a.getBucketConfig(bucket); errCode != s3err.ErrNone { - s3err.WriteErrorResponse(w, r, errCode) - return true - } - - switch object { - case sosAPISystemXML: - xmlData, err = generateSystemXML() - if err != nil { - glog.Errorf("SOSAPI: failed to generate system.xml for HEAD: %v", err) - http.Error(w, "Internal Server Error", http.StatusInternalServerError) - return true - } - glog.V(2).Infof("SOSAPI: HEAD system.xml for bucket %s", bucket) - - case sosAPICapacityXML: - xmlData, err = s3a.generateCapacityXML(r.Context()) - if err != nil { - glog.Errorf("SOSAPI: failed to generate capacity.xml for HEAD: %v", err) - http.Error(w, "Internal Server Error", http.StatusInternalServerError) - return true + xmlData, err := s3a.generateSOSAPIContent(r.Context(), bucket, object) + if err != nil { + if errors.Is(err, filer_pb.ErrNotFound) { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) + } else { + glog.Errorf("SOSAPI: failed to generate %s for HEAD: %v", object, err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) } - glog.V(2).Infof("SOSAPI: HEAD capacity.xml for bucket %s", bucket) - - default: - return false + return true } - // Prepend XML declaration for accurate size calculation - xmlData = append([]byte(xml.Header), xmlData...) - // Calculate ETag from content hash := md5.Sum(xmlData) etag := hex.EncodeToString(hash[:]) @@ -291,60 +302,41 @@ func (s3a *S3ApiServer) handleSOSAPIHeadObject(w http.ResponseWriter, r *http.Re return true } -// serveSOSAPIRange handles Range requests for SOSAPI objects. -func (s3a *S3ApiServer) serveSOSAPIRange(w http.ResponseWriter, r *http.Request, data []byte, etag string) { - rangeHeader := r.Header.Get("Range") - if !strings.HasPrefix(rangeHeader, "bytes=") { - http.Error(w, "Invalid Range", http.StatusRequestedRangeNotSatisfiable) - return - } - - // Parse simple range like "bytes=0-99" - rangeSpec := strings.TrimPrefix(rangeHeader, "bytes=") - parts := strings.Split(rangeSpec, "-") - if len(parts) != 2 { - http.Error(w, "Invalid Range", http.StatusRequestedRangeNotSatisfiable) - return +// generateSOSAPIContent generates the XML content for SOSAPI virtual objects. +// Returns the complete XML with declaration prepended. +func (s3a *S3ApiServer) generateSOSAPIContent(ctx context.Context, bucket, object string) ([]byte, error) { + // Verify bucket exists + if _, errCode := s3a.getBucketConfig(bucket); errCode != s3err.ErrNone { + if errCode == s3err.ErrNoSuchBucket { + return nil, filer_pb.ErrNotFound + } + return nil, fmt.Errorf("bucket config error: %v", errCode) } - var start, end int64 - size := int64(len(data)) + var xmlData []byte + var err error - if parts[0] == "" { - // Suffix range: -N means last N bytes - var n int64 - if _, err := io.ReadFull(strings.NewReader(parts[1]), make([]byte, 0)); err == nil { - // Parse suffix length - n = size // fallback to full content - } - start = size - n - if start < 0 { - start = 0 + switch object { + case sosAPISystemXML: + xmlData, err = generateSystemXML() + if err != nil { + return nil, err } - end = size - 1 - } else { - // Normal range: start-end - start = 0 - end = size - 1 - // Simple parsing - in production would need proper int parsing - } + glog.V(4).Infof("SOSAPI: generated system.xml for bucket %s", bucket) - if start > end || start >= size { - http.Error(w, "Invalid Range", http.StatusRequestedRangeNotSatisfiable) - return - } + case sosAPICapacityXML: + xmlData, err = s3a.generateCapacityXML(ctx, bucket) + if err != nil { + return nil, err + } + glog.V(4).Infof("SOSAPI: generated capacity.xml for bucket %s", bucket) - if end >= size { - end = size - 1 + default: + return nil, fmt.Errorf("unknown SOSAPI object: %s", object) } - // Set partial content headers - w.Header().Set("Content-Type", "application/xml") - w.Header().Set("ETag", "\""+etag+"\"") - w.Header().Set("Content-Range", "bytes "+strconv.FormatInt(start, 10)+"-"+strconv.FormatInt(end, 10)+"/"+strconv.FormatInt(size, 10)) - w.Header().Set("Content-Length", strconv.FormatInt(end-start+1, 10)) - w.WriteHeader(http.StatusPartialContent) + // Prepend XML declaration + xmlData = append([]byte(xml.Header), xmlData...) - // Write the requested range - w.Write(data[start : end+1]) + return xmlData, nil } diff --git a/weed/s3api/s3api_sosapi_test.go b/weed/s3api/s3api_sosapi_test.go index 691b41ff9..c14bd16f6 100644 --- a/weed/s3api/s3api_sosapi_test.go +++ b/weed/s3api/s3api_sosapi_test.go @@ -5,6 +5,8 @@ import ( "net/http/httptest" "strings" "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" ) func TestIsSOSAPIObject(t *testing.T) { @@ -134,91 +136,7 @@ func TestGenerateSystemXML(t *testing.T) { } } -func TestCapacityInfoXMLStruct(t *testing.T) { - // Test that CapacityInfo can be marshaled correctly - ci := CapacityInfo{ - Capacity: 1000000, - Available: 800000, - Used: 200000, - } - - xmlData, err := xml.Marshal(&ci) - if err != nil { - t.Fatalf("xml.Marshal failed: %v", err) - } - - // Verify roundtrip - var parsed CapacityInfo - if err := xml.Unmarshal(xmlData, &parsed); err != nil { - t.Fatalf("xml.Unmarshal failed: %v", err) - } - - if parsed.Capacity != ci.Capacity { - t.Errorf("Capacity = %d, want %d", parsed.Capacity, ci.Capacity) - } - if parsed.Available != ci.Available { - t.Errorf("Available = %d, want %d", parsed.Available, ci.Available) - } - if parsed.Used != ci.Used { - t.Errorf("Used = %d, want %d", parsed.Used, ci.Used) - } -} - -func TestSOSAPIConstants(t *testing.T) { - // Verify constants are correctly set - if !strings.HasPrefix(sosAPISystemXML, sosAPISystemFolder) { - t.Errorf("sosAPISystemXML should start with sosAPISystemFolder") - } - - if !strings.HasPrefix(sosAPICapacityXML, sosAPISystemFolder) { - t.Errorf("sosAPICapacityXML should start with sosAPISystemFolder") - } - - if !strings.HasSuffix(sosAPISystemXML, "system.xml") { - t.Errorf("sosAPISystemXML should end with 'system.xml'") - } - - if !strings.HasSuffix(sosAPICapacityXML, "capacity.xml") { - t.Errorf("sosAPICapacityXML should end with 'capacity.xml'") - } - - // Protocol version should be quoted per SOSAPI spec - if !strings.HasPrefix(sosAPIProtocolVersion, "\"") || !strings.HasSuffix(sosAPIProtocolVersion, "\"") { - t.Errorf("sosAPIProtocolVersion should be quoted, got: %s", sosAPIProtocolVersion) - } -} - -func TestSystemInfoXMLRootElement(t *testing.T) { - xmlData, err := generateSystemXML() - if err != nil { - t.Fatalf("generateSystemXML() failed: %v", err) - } - - xmlStr := string(xmlData) - - // Verify root element name - if !strings.Contains(xmlStr, "") { - t.Error("XML should contain root element") - } - - // Verify required elements - requiredElements := []string{ - "", - "", - "", - "", - } - - for _, elem := range requiredElements { - if !strings.Contains(xmlStr, elem) { - t.Errorf("XML should contain %s element", elem) - } - } -} - -// TestSOSAPIHandlerIntegration tests the basic handler flow without a full server func TestSOSAPIObjectDetectionEdgeCases(t *testing.T) { - // Test various edge cases for object detection edgeCases := []struct { object string expected bool @@ -244,32 +162,87 @@ func TestSOSAPIObjectDetectionEdgeCases(t *testing.T) { } } -// TestSOSAPIHandlerReturnsXMLContentType verifies content-type setting logic -func TestSOSAPIXMLContentType(t *testing.T) { - // Create a mock response writer to check headers - w := httptest.NewRecorder() +func TestCollectBucketUsageFromTopology(t *testing.T) { + topo := &master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + RackInfos: []*master_pb.RackInfo{ + { + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd": { + VolumeInfos: []*master_pb.VolumeInformationMessage{ + {Id: 1, Size: 100, Collection: "bucket1"}, + {Id: 2, Size: 200, Collection: "bucket2"}, + {Id: 3, Size: 300, Collection: "bucket1"}, + {Id: 1, Size: 100, Collection: "bucket1"}, // Duplicate (replica), should be ignored + }, + }, + }, + }, + }, + }, + }, + }, + }, + } - // Simulate what the handler should set - w.Header().Set("Content-Type", "application/xml") + usage := collectBucketUsageFromTopology(topo, "bucket1") + expected := int64(400) // 100 + 300 + if usage != expected { + t.Errorf("collectBucketUsageFromTopology = %d, want %d", usage, expected) + } - contentType := w.Header().Get("Content-Type") - if contentType != "application/xml" { - t.Errorf("Content-Type = %q, want 'application/xml'", contentType) + usage2 := collectBucketUsageFromTopology(topo, "bucket2") + expected2 := int64(200) + if usage2 != expected2 { + t.Errorf("collectBucketUsageFromTopology = %d, want %d", usage2, expected2) } } -func TestHTTPTimeFormat(t *testing.T) { - // Verify the Last-Modified header format is correct for HTTP - w := httptest.NewRecorder() - w.Header().Set("Last-Modified", "Sat, 28 Dec 2024 20:00:00 GMT") - - lastMod := w.Header().Get("Last-Modified") - if lastMod == "" { - t.Error("Last-Modified header should be set") +func TestCalculateClusterCapacity(t *testing.T) { + topo := &master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + RackInfos: []*master_pb.RackInfo{ + { + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd": { + MaxVolumeCount: 100, + FreeVolumeCount: 40, + }, + }, + }, + { + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd": { + MaxVolumeCount: 200, + FreeVolumeCount: 160, + }, + }, + }, + }, + }, + }, + }, + }, } - // HTTP date should contain day of week - if !strings.Contains(lastMod, "Dec") { - t.Errorf("Last-Modified should contain month, got: %s", lastMod) + volumeSizeLimitMb := uint64(1000) // 1GB + volumeSizeBytes := int64(1000) * 1024 * 1024 + + total, available := calculateClusterCapacity(topo, volumeSizeLimitMb) + + expectedTotal := int64(300) * volumeSizeBytes + expectedAvailable := int64(200) * volumeSizeBytes + + if total != expectedTotal { + t.Errorf("calculateClusterCapacity total = %d, want %d", total, expectedTotal) + } + if available != expectedAvailable { + t.Errorf("calculateClusterCapacity available = %d, want %d", available, expectedAvailable) } }