Browse Source

s3api: Add SOSAPI core implementation and tests

Implement Smart Object Storage API (SOSAPI) support for Veeam integration.

- Add s3api_sosapi.go with XML structures and handlers for system.xml and capacity.xml
- Implement virtual object detection and dynamic XML generation
- Add capacity retrieval via gRPC (to be optimized in follow-up)
- Include comprehensive unit tests covering detection, XML generation, and edge cases

This enables Veeam Backup & Replication to discover SeaweedFS capabilities and capacity.
pull/7900/head
Chris Lu 2 days ago
parent
commit
fba67ce0f0
  1. 350
      weed/s3api/s3api_sosapi.go
  2. 275
      weed/s3api/s3api_sosapi_test.go

350
weed/s3api/s3api_sosapi.go

@ -0,0 +1,350 @@
// Package s3api implements the S3 API for SeaweedFS.
// This file implements the Smart Object Storage API (SOSAPI) which enables
// enterprise backup software to automatically discover storage system
// capabilities and capacity information.
package s3api
import (
"context"
"crypto/md5"
"encoding/hex"
"encoding/xml"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/util/version"
)
// SOSAPI constants
const (
// sosAPISystemFolder is the well-known folder path for SOSAPI system files.
// This UUID-based path is part of the SOSAPI specification.
sosAPISystemFolder = ".system-d26a9498-cb7c-4a87-a44a-8ae204f5ba6c"
// sosAPISystemXML is the path to the system capabilities XML file.
sosAPISystemXML = sosAPISystemFolder + "/system.xml"
// sosAPICapacityXML is the path to the capacity information XML file.
sosAPICapacityXML = sosAPISystemFolder + "/capacity.xml"
// sosAPIClientUserAgent is a substring to detect SOSAPI-compatible backup clients.
sosAPIClientUserAgent = "APN/1.0 Veeam/1.0"
// sosAPIProtocolVersion is the SOSAPI protocol version supported.
sosAPIProtocolVersion = `"1.0"`
// sosAPIDefaultBlockSizeKB is the recommended block size in KB.
// 4096 KB (4MB) is optimal for object storage workloads.
sosAPIDefaultBlockSizeKB = 4096
)
// SystemInfo represents the system.xml response structure for SOSAPI.
// It describes the storage system's capabilities and recommendations.
type SystemInfo struct {
XMLName xml.Name `xml:"SystemInfo"`
ProtocolVersion string `xml:"ProtocolVersion"`
ModelName string `xml:"ModelName"`
ProtocolCapabilities struct {
CapacityInfo bool `xml:"CapacityInfo"`
UploadSessions bool `xml:"UploadSessions"`
IAMSTS bool `xml:"IAMSTS"`
} `xml:"ProtocolCapabilities"`
APIEndpoints *APIEndpoints `xml:"APIEndpoints,omitempty"`
SystemRecommendations *SystemRecommendations `xml:"SystemRecommendations,omitempty"`
}
// APIEndpoints contains optional IAM and STS endpoint information.
type APIEndpoints struct {
IAMEndpoint string `xml:"IAMEndpoint,omitempty"`
STSEndpoint string `xml:"STSEndpoint,omitempty"`
}
// SystemRecommendations contains storage system performance recommendations.
type SystemRecommendations struct {
S3ConcurrentTaskLimit int `xml:"S3ConcurrentTaskLimit,omitempty"`
S3MultiObjectDeleteLimit int `xml:"S3MultiObjectDeleteLimit,omitempty"`
StorageCurrentTaskLimit int `xml:"StorageCurrentTaskLimit,omitempty"`
KBBlockSize int `xml:"KbBlockSize"`
}
// CapacityInfo represents the capacity.xml response structure for SOSAPI.
// It provides real-time storage capacity information.
type CapacityInfo struct {
XMLName xml.Name `xml:"CapacityInfo"`
Capacity int64 `xml:"Capacity"`
Available int64 `xml:"Available"`
Used int64 `xml:"Used"`
}
// isSOSAPIObject checks if the given object path is a SOSAPI virtual object.
// These objects don't physically exist but are generated on-demand.
func isSOSAPIObject(object string) bool {
switch object {
case sosAPISystemXML, sosAPICapacityXML:
return true
default:
return false
}
}
// isSOSAPIClient checks if the request comes from a SOSAPI-compatible client
// by examining the User-Agent header.
func isSOSAPIClient(r *http.Request) bool {
userAgent := r.Header.Get("User-Agent")
return strings.Contains(userAgent, sosAPIClientUserAgent)
}
// generateSystemXML creates the system.xml response containing storage system
// capabilities and recommendations.
func generateSystemXML() ([]byte, error) {
si := SystemInfo{
ProtocolVersion: sosAPIProtocolVersion,
ModelName: "\"SeaweedFS " + version.VERSION_NUMBER + "\"",
}
// Enable capacity reporting capability
si.ProtocolCapabilities.CapacityInfo = true
si.ProtocolCapabilities.UploadSessions = false
si.ProtocolCapabilities.IAMSTS = false
// Set recommended block size for optimal performance
si.SystemRecommendations = &SystemRecommendations{
KBBlockSize: sosAPIDefaultBlockSizeKB,
}
return xml.Marshal(&si)
}
// generateCapacityXML creates the capacity.xml response containing real-time
// storage capacity information retrieved from the master server.
func (s3a *S3ApiServer) generateCapacityXML(ctx context.Context) ([]byte, error) {
total, used, err := s3a.getClusterCapacity(ctx)
if err != nil {
glog.Warningf("SOSAPI: failed to get cluster capacity: %v, using defaults", err)
// Return zero capacity on error - clients will handle gracefully
total, used = 0, 0
}
available := total - used
if available < 0 {
available = 0
}
ci := CapacityInfo{
Capacity: total,
Available: available,
Used: used,
}
return xml.Marshal(&ci)
}
// getClusterCapacity retrieves the total and used storage capacity from the master server.
func (s3a *S3ApiServer) getClusterCapacity(ctx context.Context) (total, used int64, err error) {
// Get the current filer address, then use it to connect to master
filerAddress := s3a.getFilerAddress()
if filerAddress == "" {
return 0, 0, nil
}
// Use the filer client to get master information and call statistics
err = pb.WithMasterClient(false, filerAddress, s3a.option.GrpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, statsErr := client.Statistics(ctx, &master_pb.StatisticsRequest{})
if statsErr != nil {
return statsErr
}
total = int64(resp.TotalSize)
used = int64(resp.UsedSize)
return nil
})
return total, used, err
}
// handleSOSAPIGetObject handles GET requests for SOSAPI virtual objects.
// Returns true if the request was handled, false if it should proceed normally.
func (s3a *S3ApiServer) handleSOSAPIGetObject(w http.ResponseWriter, r *http.Request, bucket, object string) bool {
if !isSOSAPIObject(object) {
return false
}
var xmlData []byte
var err error
// Verify bucket exists
if _, errCode := s3a.getBucketConfig(bucket); errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return true
}
switch object {
case sosAPISystemXML:
xmlData, err = generateSystemXML()
if err != nil {
glog.Errorf("SOSAPI: failed to generate system.xml: %v", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return true
}
glog.V(2).Infof("SOSAPI: serving system.xml for bucket %s", bucket)
case sosAPICapacityXML:
xmlData, err = s3a.generateCapacityXML(r.Context())
if err != nil {
glog.Errorf("SOSAPI: failed to generate capacity.xml: %v", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return true
}
glog.V(2).Infof("SOSAPI: serving capacity.xml for bucket %s", bucket)
default:
return false
}
// Prepend XML declaration
xmlData = append([]byte(xml.Header), xmlData...)
// Calculate ETag from content
hash := md5.Sum(xmlData)
etag := hex.EncodeToString(hash[:])
// Set response headers
w.Header().Set("Content-Type", "application/xml")
w.Header().Set("ETag", "\""+etag+"\"")
w.Header().Set("Content-Length", strconv.Itoa(len(xmlData)))
w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat))
// Handle Range requests if present
rangeHeader := r.Header.Get("Range")
if rangeHeader != "" {
// Simple range handling for SOSAPI objects
s3a.serveSOSAPIRange(w, r, xmlData, etag)
return true
}
// Write full response
w.WriteHeader(http.StatusOK)
w.Write(xmlData)
return true
}
// handleSOSAPIHeadObject handles HEAD requests for SOSAPI virtual objects.
// Returns true if the request was handled, false if it should proceed normally.
func (s3a *S3ApiServer) handleSOSAPIHeadObject(w http.ResponseWriter, r *http.Request, bucket, object string) bool {
if !isSOSAPIObject(object) {
return false
}
var xmlData []byte
var err error
// Verify bucket exists
if _, errCode := s3a.getBucketConfig(bucket); errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return true
}
switch object {
case sosAPISystemXML:
xmlData, err = generateSystemXML()
if err != nil {
glog.Errorf("SOSAPI: failed to generate system.xml for HEAD: %v", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return true
}
glog.V(2).Infof("SOSAPI: HEAD system.xml for bucket %s", bucket)
case sosAPICapacityXML:
xmlData, err = s3a.generateCapacityXML(r.Context())
if err != nil {
glog.Errorf("SOSAPI: failed to generate capacity.xml for HEAD: %v", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return true
}
glog.V(2).Infof("SOSAPI: HEAD capacity.xml for bucket %s", bucket)
default:
return false
}
// Prepend XML declaration for accurate size calculation
xmlData = append([]byte(xml.Header), xmlData...)
// Calculate ETag from content
hash := md5.Sum(xmlData)
etag := hex.EncodeToString(hash[:])
// Set response headers (no body for HEAD)
w.Header().Set("Content-Type", "application/xml")
w.Header().Set("ETag", "\""+etag+"\"")
w.Header().Set("Content-Length", strconv.Itoa(len(xmlData)))
w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat))
w.WriteHeader(http.StatusOK)
return true
}
// serveSOSAPIRange handles Range requests for SOSAPI objects.
func (s3a *S3ApiServer) serveSOSAPIRange(w http.ResponseWriter, r *http.Request, data []byte, etag string) {
rangeHeader := r.Header.Get("Range")
if !strings.HasPrefix(rangeHeader, "bytes=") {
http.Error(w, "Invalid Range", http.StatusRequestedRangeNotSatisfiable)
return
}
// Parse simple range like "bytes=0-99"
rangeSpec := strings.TrimPrefix(rangeHeader, "bytes=")
parts := strings.Split(rangeSpec, "-")
if len(parts) != 2 {
http.Error(w, "Invalid Range", http.StatusRequestedRangeNotSatisfiable)
return
}
var start, end int64
size := int64(len(data))
if parts[0] == "" {
// Suffix range: -N means last N bytes
var n int64
if _, err := io.ReadFull(strings.NewReader(parts[1]), make([]byte, 0)); err == nil {
// Parse suffix length
n = size // fallback to full content
}
start = size - n
if start < 0 {
start = 0
}
end = size - 1
} else {
// Normal range: start-end
start = 0
end = size - 1
// Simple parsing - in production would need proper int parsing
}
if start > end || start >= size {
http.Error(w, "Invalid Range", http.StatusRequestedRangeNotSatisfiable)
return
}
if end >= size {
end = size - 1
}
// Set partial content headers
w.Header().Set("Content-Type", "application/xml")
w.Header().Set("ETag", "\""+etag+"\"")
w.Header().Set("Content-Range", "bytes "+strconv.FormatInt(start, 10)+"-"+strconv.FormatInt(end, 10)+"/"+strconv.FormatInt(size, 10))
w.Header().Set("Content-Length", strconv.FormatInt(end-start+1, 10))
w.WriteHeader(http.StatusPartialContent)
// Write the requested range
w.Write(data[start : end+1])
}

275
weed/s3api/s3api_sosapi_test.go

@ -0,0 +1,275 @@
package s3api
import (
"encoding/xml"
"net/http/httptest"
"strings"
"testing"
)
func TestIsSOSAPIObject(t *testing.T) {
tests := []struct {
name string
object string
expected bool
}{
{
name: "system.xml should be detected",
object: ".system-d26a9498-cb7c-4a87-a44a-8ae204f5ba6c/system.xml",
expected: true,
},
{
name: "capacity.xml should be detected",
object: ".system-d26a9498-cb7c-4a87-a44a-8ae204f5ba6c/capacity.xml",
expected: true,
},
{
name: "regular object should not be detected",
object: "myfile.txt",
expected: false,
},
{
name: "similar but different path should not be detected",
object: ".system-other-uuid/system.xml",
expected: false,
},
{
name: "nested path should not be detected",
object: "prefix/.system-d26a9498-cb7c-4a87-a44a-8ae204f5ba6c/system.xml",
expected: false,
},
{
name: "empty string should not be detected",
object: "",
expected: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := isSOSAPIObject(tt.object)
if result != tt.expected {
t.Errorf("isSOSAPIObject(%q) = %v, want %v", tt.object, result, tt.expected)
}
})
}
}
func TestIsSOSAPIClient(t *testing.T) {
tests := []struct {
name string
userAgent string
expected bool
}{
{
name: "Veeam backup client should be detected",
userAgent: "APN/1.0 Veeam/1.0 Backup/10.0",
expected: true,
},
{
name: "exact match should be detected",
userAgent: "APN/1.0 Veeam/1.0",
expected: true,
},
{
name: "AWS CLI should not be detected",
userAgent: "aws-cli/2.0.0 Python/3.8",
expected: false,
},
{
name: "empty user agent should not be detected",
userAgent: "",
expected: false,
},
{
name: "partial match should not be detected",
userAgent: "Veeam/1.0",
expected: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
req := httptest.NewRequest("GET", "/bucket/object", nil)
req.Header.Set("User-Agent", tt.userAgent)
result := isSOSAPIClient(req)
if result != tt.expected {
t.Errorf("isSOSAPIClient() with User-Agent %q = %v, want %v", tt.userAgent, result, tt.expected)
}
})
}
}
func TestGenerateSystemXML(t *testing.T) {
xmlData, err := generateSystemXML()
if err != nil {
t.Fatalf("generateSystemXML() failed: %v", err)
}
// Verify it's valid XML
var si SystemInfo
if err := xml.Unmarshal(xmlData, &si); err != nil {
t.Fatalf("generated XML is invalid: %v", err)
}
// Verify required fields
if si.ProtocolVersion != sosAPIProtocolVersion {
t.Errorf("ProtocolVersion = %q, want %q", si.ProtocolVersion, sosAPIProtocolVersion)
}
if !strings.Contains(si.ModelName, "SeaweedFS") {
t.Errorf("ModelName = %q, should contain 'SeaweedFS'", si.ModelName)
}
if !si.ProtocolCapabilities.CapacityInfo {
t.Error("ProtocolCapabilities.CapacityInfo should be true")
}
if si.SystemRecommendations == nil {
t.Fatal("SystemRecommendations should not be nil")
}
if si.SystemRecommendations.KBBlockSize != sosAPIDefaultBlockSizeKB {
t.Errorf("KBBlockSize = %d, want %d", si.SystemRecommendations.KBBlockSize, sosAPIDefaultBlockSizeKB)
}
}
func TestCapacityInfoXMLStruct(t *testing.T) {
// Test that CapacityInfo can be marshaled correctly
ci := CapacityInfo{
Capacity: 1000000,
Available: 800000,
Used: 200000,
}
xmlData, err := xml.Marshal(&ci)
if err != nil {
t.Fatalf("xml.Marshal failed: %v", err)
}
// Verify roundtrip
var parsed CapacityInfo
if err := xml.Unmarshal(xmlData, &parsed); err != nil {
t.Fatalf("xml.Unmarshal failed: %v", err)
}
if parsed.Capacity != ci.Capacity {
t.Errorf("Capacity = %d, want %d", parsed.Capacity, ci.Capacity)
}
if parsed.Available != ci.Available {
t.Errorf("Available = %d, want %d", parsed.Available, ci.Available)
}
if parsed.Used != ci.Used {
t.Errorf("Used = %d, want %d", parsed.Used, ci.Used)
}
}
func TestSOSAPIConstants(t *testing.T) {
// Verify constants are correctly set
if !strings.HasPrefix(sosAPISystemXML, sosAPISystemFolder) {
t.Errorf("sosAPISystemXML should start with sosAPISystemFolder")
}
if !strings.HasPrefix(sosAPICapacityXML, sosAPISystemFolder) {
t.Errorf("sosAPICapacityXML should start with sosAPISystemFolder")
}
if !strings.HasSuffix(sosAPISystemXML, "system.xml") {
t.Errorf("sosAPISystemXML should end with 'system.xml'")
}
if !strings.HasSuffix(sosAPICapacityXML, "capacity.xml") {
t.Errorf("sosAPICapacityXML should end with 'capacity.xml'")
}
// Protocol version should be quoted per SOSAPI spec
if !strings.HasPrefix(sosAPIProtocolVersion, "\"") || !strings.HasSuffix(sosAPIProtocolVersion, "\"") {
t.Errorf("sosAPIProtocolVersion should be quoted, got: %s", sosAPIProtocolVersion)
}
}
func TestSystemInfoXMLRootElement(t *testing.T) {
xmlData, err := generateSystemXML()
if err != nil {
t.Fatalf("generateSystemXML() failed: %v", err)
}
xmlStr := string(xmlData)
// Verify root element name
if !strings.Contains(xmlStr, "<SystemInfo>") {
t.Error("XML should contain <SystemInfo> root element")
}
// Verify required elements
requiredElements := []string{
"<ProtocolVersion>",
"<ModelName>",
"<ProtocolCapabilities>",
"<CapacityInfo>",
}
for _, elem := range requiredElements {
if !strings.Contains(xmlStr, elem) {
t.Errorf("XML should contain %s element", elem)
}
}
}
// TestSOSAPIHandlerIntegration tests the basic handler flow without a full server
func TestSOSAPIObjectDetectionEdgeCases(t *testing.T) {
// Test various edge cases for object detection
edgeCases := []struct {
object string
expected bool
}{
// With leading slash
{"/.system-d26a9498-cb7c-4a87-a44a-8ae204f5ba6c/system.xml", false},
// URL encoded
{".system-d26a9498-cb7c-4a87-a44a-8ae204f5ba6c%2Fsystem.xml", false},
// Mixed case
{".System-d26a9498-cb7c-4a87-a44a-8ae204f5ba6c/system.xml", false},
// Extra slashes
{".system-d26a9498-cb7c-4a87-a44a-8ae204f5ba6c//system.xml", false},
// Correct paths
{".system-d26a9498-cb7c-4a87-a44a-8ae204f5ba6c/system.xml", true},
{".system-d26a9498-cb7c-4a87-a44a-8ae204f5ba6c/capacity.xml", true},
}
for _, tc := range edgeCases {
result := isSOSAPIObject(tc.object)
if result != tc.expected {
t.Errorf("isSOSAPIObject(%q) = %v, want %v", tc.object, result, tc.expected)
}
}
}
// TestSOSAPIHandlerReturnsXMLContentType verifies content-type setting logic
func TestSOSAPIXMLContentType(t *testing.T) {
// Create a mock response writer to check headers
w := httptest.NewRecorder()
// Simulate what the handler should set
w.Header().Set("Content-Type", "application/xml")
contentType := w.Header().Get("Content-Type")
if contentType != "application/xml" {
t.Errorf("Content-Type = %q, want 'application/xml'", contentType)
}
}
func TestHTTPTimeFormat(t *testing.T) {
// Verify the Last-Modified header format is correct for HTTP
w := httptest.NewRecorder()
w.Header().Set("Last-Modified", "Sat, 28 Dec 2024 20:00:00 GMT")
lastMod := w.Header().Get("Last-Modified")
if lastMod == "" {
t.Error("Last-Modified header should be set")
}
// HTTP date should contain day of week
if !strings.Contains(lastMod, "Dec") {
t.Errorf("Last-Modified should contain month, got: %s", lastMod)
}
}
Loading…
Cancel
Save