You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
342 lines
11 KiB
342 lines
11 KiB
// Package s3api implements the S3 API for SeaweedFS.
|
|
// This file implements the Smart Object Storage API (SOSAPI) which enables
|
|
// enterprise backup software to automatically discover storage system
|
|
// capabilities and capacity information.
|
|
package s3api
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/md5"
|
|
"encoding/hex"
|
|
"encoding/xml"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/version"
|
|
)
|
|
|
|
// SOSAPI constants
|
|
const (
|
|
// sosAPISystemFolder is the well-known folder path for SOSAPI system files.
|
|
// This UUID-based path is part of the SOSAPI specification.
|
|
sosAPISystemFolder = ".system-d26a9498-cb7c-4a87-a44a-8ae204f5ba6c"
|
|
|
|
// sosAPISystemXML is the path to the system capabilities XML file.
|
|
sosAPISystemXML = sosAPISystemFolder + "/system.xml"
|
|
|
|
// sosAPICapacityXML is the path to the capacity information XML file.
|
|
sosAPICapacityXML = sosAPISystemFolder + "/capacity.xml"
|
|
|
|
// sosAPIClientUserAgent is a substring to detect SOSAPI-compatible backup clients.
|
|
sosAPIClientUserAgent = "APN/1.0 Veeam/1.0"
|
|
|
|
// sosAPIProtocolVersion is the SOSAPI protocol version supported.
|
|
sosAPIProtocolVersion = `"1.0"`
|
|
|
|
// sosAPIDefaultBlockSizeKB is the recommended block size in KB.
|
|
// 4096 KB (4MB) is optimal for object storage workloads.
|
|
sosAPIDefaultBlockSizeKB = 4096
|
|
)
|
|
|
|
// SystemInfo represents the system.xml response structure for SOSAPI.
|
|
// It describes the storage system's capabilities and recommendations.
|
|
type SystemInfo struct {
|
|
XMLName xml.Name `xml:"SystemInfo"`
|
|
ProtocolVersion string `xml:"ProtocolVersion"`
|
|
ModelName string `xml:"ModelName"`
|
|
ProtocolCapabilities struct {
|
|
CapacityInfo bool `xml:"CapacityInfo"`
|
|
UploadSessions bool `xml:"UploadSessions"`
|
|
IAMSTS bool `xml:"IAMSTS"`
|
|
} `xml:"ProtocolCapabilities"`
|
|
APIEndpoints *APIEndpoints `xml:"APIEndpoints,omitempty"`
|
|
SystemRecommendations *SystemRecommendations `xml:"SystemRecommendations,omitempty"`
|
|
}
|
|
|
|
// APIEndpoints contains optional IAM and STS endpoint information.
|
|
type APIEndpoints struct {
|
|
IAMEndpoint string `xml:"IAMEndpoint,omitempty"`
|
|
STSEndpoint string `xml:"STSEndpoint,omitempty"`
|
|
}
|
|
|
|
// SystemRecommendations contains storage system performance recommendations.
|
|
type SystemRecommendations struct {
|
|
S3ConcurrentTaskLimit int `xml:"S3ConcurrentTaskLimit,omitempty"`
|
|
S3MultiObjectDeleteLimit int `xml:"S3MultiObjectDeleteLimit,omitempty"`
|
|
StorageCurrentTaskLimit int `xml:"StorageCurrentTaskLimit,omitempty"`
|
|
KBBlockSize int `xml:"KbBlockSize"`
|
|
}
|
|
|
|
// CapacityInfo represents the capacity.xml response structure for SOSAPI.
|
|
// It provides real-time storage capacity information.
|
|
type CapacityInfo struct {
|
|
XMLName xml.Name `xml:"CapacityInfo"`
|
|
Capacity int64 `xml:"Capacity"`
|
|
Available int64 `xml:"Available"`
|
|
Used int64 `xml:"Used"`
|
|
}
|
|
|
|
// isSOSAPIObject checks if the given object path is a SOSAPI virtual object.
|
|
// These objects don't physically exist but are generated on-demand.
|
|
func isSOSAPIObject(object string) bool {
|
|
switch object {
|
|
case sosAPISystemXML, sosAPICapacityXML:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// isSOSAPIClient checks if the request comes from a SOSAPI-compatible client
|
|
// by examining the User-Agent header.
|
|
func isSOSAPIClient(r *http.Request) bool {
|
|
userAgent := r.Header.Get("User-Agent")
|
|
return strings.Contains(userAgent, sosAPIClientUserAgent)
|
|
}
|
|
|
|
// generateSystemXML creates the system.xml response containing storage system
|
|
// capabilities and recommendations.
|
|
func generateSystemXML() ([]byte, error) {
|
|
si := SystemInfo{
|
|
ProtocolVersion: sosAPIProtocolVersion,
|
|
ModelName: "\"SeaweedFS " + version.VERSION_NUMBER + "\"",
|
|
}
|
|
|
|
// Enable capacity reporting capability
|
|
si.ProtocolCapabilities.CapacityInfo = true
|
|
si.ProtocolCapabilities.UploadSessions = false
|
|
si.ProtocolCapabilities.IAMSTS = false
|
|
|
|
// Set recommended block size for optimal performance
|
|
si.SystemRecommendations = &SystemRecommendations{
|
|
KBBlockSize: sosAPIDefaultBlockSizeKB,
|
|
}
|
|
|
|
return xml.Marshal(&si)
|
|
}
|
|
|
|
// generateCapacityXML creates the capacity.xml response containing real-time
|
|
// storage capacity information.
|
|
func (s3a *S3ApiServer) generateCapacityXML(ctx context.Context, bucket string) ([]byte, error) {
|
|
total, available, used, err := s3a.getCapacityInfo(ctx, bucket)
|
|
if err != nil {
|
|
glog.Warningf("SOSAPI: failed to get capacity info for bucket %s: %v, using defaults", bucket, err)
|
|
// Return zero capacity on error
|
|
total, available, used = 0, 0, 0
|
|
}
|
|
|
|
ci := CapacityInfo{
|
|
Capacity: total,
|
|
Available: available,
|
|
Used: used,
|
|
}
|
|
|
|
return xml.Marshal(&ci)
|
|
}
|
|
|
|
// getCapacityInfo retrieves capacity information for the specific bucket.
|
|
// It checks bucket quota first, then falls back to cluster topology information.
|
|
// Returns capacity, available, and used bytes.
|
|
func (s3a *S3ApiServer) getCapacityInfo(ctx context.Context, bucket string) (capacity, available, used int64, err error) {
|
|
// 1. Check if bucket has a quota
|
|
// We use s3a.getEntry which is a helper in s3api_bucket_handlers.go
|
|
var quota int64
|
|
// getEntry communicates with filer, so errors here might mean filer connectivity issues or bucket not found
|
|
// If bucket not found, we probably shouldn't be here (checked in handler), but safe to ignore
|
|
if entry, getErr := s3a.getEntry(s3a.option.BucketsPath, bucket); getErr == nil && entry != nil {
|
|
quota = entry.Quota
|
|
}
|
|
|
|
// 2. Get cluster topology from master
|
|
if len(s3a.option.Masters) == 0 {
|
|
return 0, 0, 0, fmt.Errorf("no master servers configured")
|
|
}
|
|
|
|
masterMap := make(map[string]pb.ServerAddress)
|
|
for _, master := range s3a.option.Masters {
|
|
masterMap[string(master)] = master
|
|
}
|
|
|
|
// Connect to any available master and get volume list (topology)
|
|
err = pb.WithOneOfGrpcMasterClients(false, masterMap, s3a.option.GrpcDialOption, func(client master_pb.SeaweedClient) error {
|
|
resp, vErr := client.VolumeList(ctx, &master_pb.VolumeListRequest{})
|
|
if vErr != nil {
|
|
return vErr
|
|
}
|
|
|
|
if resp.TopologyInfo == nil {
|
|
return nil
|
|
}
|
|
|
|
// Calculate used size for the bucket by summing up volumes in the collection
|
|
used = collectBucketUsageFromTopology(resp.TopologyInfo, s3a.getCollectionName(bucket))
|
|
|
|
// Calculate cluster capacity if no quota
|
|
if quota > 0 {
|
|
capacity = quota
|
|
available = quota - used
|
|
if available < 0 {
|
|
available = 0
|
|
}
|
|
} else {
|
|
// No quota - use cluster capacity
|
|
clusterTotal, clusterAvailable := calculateClusterCapacity(resp.TopologyInfo, resp.VolumeSizeLimitMb)
|
|
capacity = clusterTotal
|
|
available = clusterAvailable
|
|
}
|
|
return nil
|
|
})
|
|
|
|
return capacity, available, used, err
|
|
}
|
|
|
|
// collectBucketUsageFromTopology sums up the size of all volumes belonging to the specified collection.
|
|
func collectBucketUsageFromTopology(t *master_pb.TopologyInfo, collectionName string) (used int64) {
|
|
seenVolumes := make(map[uint32]bool)
|
|
for _, dc := range t.DataCenterInfos {
|
|
for _, r := range dc.RackInfos {
|
|
for _, dn := range r.DataNodeInfos {
|
|
for _, disk := range dn.DiskInfos {
|
|
for _, vi := range disk.VolumeInfos {
|
|
if vi.Collection == collectionName {
|
|
if !seenVolumes[vi.Id] {
|
|
used += int64(vi.Size)
|
|
seenVolumes[vi.Id] = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// calculateClusterCapacity sums up the total and available capacity of the entire cluster.
|
|
func calculateClusterCapacity(t *master_pb.TopologyInfo, volumeSizeLimitMb uint64) (total, available int64) {
|
|
volumeSize := int64(volumeSizeLimitMb) * 1024 * 1024
|
|
for _, dc := range t.DataCenterInfos {
|
|
for _, r := range dc.RackInfos {
|
|
for _, dn := range r.DataNodeInfos {
|
|
for _, disk := range dn.DiskInfos {
|
|
total += int64(disk.MaxVolumeCount) * volumeSize
|
|
available += int64(disk.FreeVolumeCount) * volumeSize
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// handleSOSAPIGetObject handles GET requests for SOSAPI virtual objects.
|
|
// Returns true if the request was handled, false if it should proceed normally.
|
|
func (s3a *S3ApiServer) handleSOSAPIGetObject(w http.ResponseWriter, r *http.Request, bucket, object string) bool {
|
|
if !isSOSAPIObject(object) {
|
|
return false
|
|
}
|
|
|
|
xmlData, err := s3a.generateSOSAPIContent(r.Context(), bucket, object)
|
|
if err != nil {
|
|
if errors.Is(err, filer_pb.ErrNotFound) {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
|
|
} else {
|
|
glog.Errorf("SOSAPI: failed to generate %s: %v", object, err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
}
|
|
return true
|
|
}
|
|
|
|
// Calculate ETag from content
|
|
hash := md5.Sum(xmlData)
|
|
etag := hex.EncodeToString(hash[:])
|
|
|
|
// Set ETag header manually as ServeContent doesn't calculate it automatically
|
|
w.Header().Set("ETag", "\""+etag+"\"")
|
|
w.Header().Set("Content-Type", "application/xml")
|
|
|
|
// Use http.ServeContent to handle Content-Length, Range, and Last-Modified
|
|
http.ServeContent(w, r, object, time.Now().UTC(), bytes.NewReader(xmlData))
|
|
|
|
return true
|
|
}
|
|
|
|
// handleSOSAPIHeadObject handles HEAD requests for SOSAPI virtual objects.
|
|
// Returns true if the request was handled, false if it should proceed normally.
|
|
func (s3a *S3ApiServer) handleSOSAPIHeadObject(w http.ResponseWriter, r *http.Request, bucket, object string) bool {
|
|
if !isSOSAPIObject(object) {
|
|
return false
|
|
}
|
|
|
|
xmlData, err := s3a.generateSOSAPIContent(r.Context(), bucket, object)
|
|
if err != nil {
|
|
if errors.Is(err, filer_pb.ErrNotFound) {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
|
|
} else {
|
|
glog.Errorf("SOSAPI: failed to generate %s for HEAD: %v", object, err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
}
|
|
return true
|
|
}
|
|
|
|
// Calculate ETag from content
|
|
hash := md5.Sum(xmlData)
|
|
etag := hex.EncodeToString(hash[:])
|
|
|
|
// Set response headers (no body for HEAD)
|
|
w.Header().Set("Content-Type", "application/xml")
|
|
w.Header().Set("ETag", "\""+etag+"\"")
|
|
w.Header().Set("Content-Length", strconv.Itoa(len(xmlData)))
|
|
w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat))
|
|
w.WriteHeader(http.StatusOK)
|
|
|
|
return true
|
|
}
|
|
|
|
// generateSOSAPIContent generates the XML content for SOSAPI virtual objects.
|
|
// Returns the complete XML with declaration prepended.
|
|
func (s3a *S3ApiServer) generateSOSAPIContent(ctx context.Context, bucket, object string) ([]byte, error) {
|
|
// Verify bucket exists
|
|
if _, errCode := s3a.getBucketConfig(bucket); errCode != s3err.ErrNone {
|
|
if errCode == s3err.ErrNoSuchBucket {
|
|
return nil, filer_pb.ErrNotFound
|
|
}
|
|
return nil, fmt.Errorf("bucket config error: %v", errCode)
|
|
}
|
|
|
|
var xmlData []byte
|
|
var err error
|
|
|
|
switch object {
|
|
case sosAPISystemXML:
|
|
xmlData, err = generateSystemXML()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
glog.V(4).Infof("SOSAPI: generated system.xml for bucket %s", bucket)
|
|
|
|
case sosAPICapacityXML:
|
|
xmlData, err = s3a.generateCapacityXML(ctx, bucket)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
glog.V(4).Infof("SOSAPI: generated capacity.xml for bucket %s", bucket)
|
|
|
|
default:
|
|
return nil, fmt.Errorf("unknown SOSAPI object: %s", object)
|
|
}
|
|
|
|
// Prepend XML declaration
|
|
xmlData = append([]byte(xml.Header), xmlData...)
|
|
|
|
return xmlData, nil
|
|
}
|