Browse Source

Merge branch 'master' of https://github.com/seaweedfs/seaweedfs

pull/8026/merge
Chris Lu 2 days ago
parent
commit
217d977579
  1. 9
      .github/workflows/s3-tables-tests.yml
  2. 3
      k8s/charts/seaweedfs/templates/s3/s3-service.yaml
  3. 9
      k8s/charts/seaweedfs/templates/shared/_helpers.tpl
  4. 58
      test/s3/policy/policy_test.go
  5. 2
      test/s3tables/catalog/iceberg_catalog_test.go
  6. 536
      test/s3tables/catalog_trino/trino_catalog_test.go
  7. 4
      weed/command/mini.go
  8. 2
      weed/command/mount.go
  9. 11
      weed/command/mount_std.go
  10. 6
      weed/pb/server_address.go
  11. 5
      weed/pb/volume_server.proto
  12. 28
      weed/pb/volume_server_pb/volume_server.pb.go
  13. 35
      weed/s3api/auth_credentials.go
  14. 25
      weed/s3api/auth_signature_v4.go
  15. 209
      weed/s3api/iceberg/iceberg.go
  16. 5
      weed/s3api/s3api_tables.go
  17. 23
      weed/s3api/s3tables/handler.go
  18. 5
      weed/s3api/s3tables/handler_namespace.go
  19. 3
      weed/s3api/s3tables/permissions.go
  20. 2
      weed/server/volume_grpc_admin.go
  21. 18
      weed/server/volume_grpc_scrub.go
  22. 6
      weed/server/volume_grpc_state.go
  23. 2
      weed/server/volume_server.go
  24. 7
      weed/server/volume_server_test.go
  25. 4
      weed/shell/command_ec_scrub.go
  26. 4
      weed/shell/command_volume_scrub.go
  27. 10
      weed/storage/store.go
  28. 93
      weed/storage/store_state.go

9
.github/workflows/s3-tables-tests.yml

@ -140,13 +140,16 @@ jobs:
- name: Set up Docker
uses: docker/setup-buildx-action@v3
- name: Pre-pull Trino image
run: docker pull trinodb/trino:479
- name: Install SeaweedFS
run: |
go install -buildvcs=false ./weed
- name: Run Trino Iceberg Catalog Integration Tests
timeout-minutes: 25
working-directory: test/s3/catalog_trino
working-directory: test/s3tables/catalog_trino
run: |
set -x
set -o pipefail
@ -164,7 +167,7 @@ jobs:
- name: Show test output on failure
if: failure()
working-directory: test/s3/catalog_trino
working-directory: test/s3tables/catalog_trino
run: |
echo "=== Test Output ==="
if [ -f test-output.log ]; then
@ -179,7 +182,7 @@ jobs:
uses: actions/upload-artifact@v6
with:
name: trino-iceberg-catalog-test-logs
path: test/s3/catalog_trino/test-output.log
path: test/s3tables/catalog_trino/test-output.log
retention-days: 3
s3-tables-build-verification:

3
k8s/charts/seaweedfs/templates/s3/s3-service.yaml

@ -16,6 +16,9 @@ metadata:
{{- end }}
spec:
internalTrafficPolicy: {{ .Values.s3.internalTrafficPolicy | default "Cluster" }}
{{- if and (semverCompare ">=1.31-0" .Capabilities.KubeVersion.GitVersion) (or .Values.s3.trafficDistribution .Values.filer.s3.trafficDistribution) }}
trafficDistribution: {{ include "seaweedfs.trafficDistribution" . }}
{{- end }}
ports:
- name: "swfs-s3"
port: {{ if .Values.s3.enabled }}{{ .Values.s3.port }}{{ else }}{{ .Values.filer.s3.port }}{{ end }}

9
k8s/charts/seaweedfs/templates/shared/_helpers.tpl

@ -323,3 +323,12 @@ Create the name of the service account to use
{{- define "seaweedfs.serviceAccountName" -}}
{{- .Values.global.serviceAccountName | default "seaweedfs" -}}
{{- end -}}
{{/* Generate a compatible trafficDistribution value due to "PreferClose" fast deprecation in k8s v1.35 */}}
{{- define "seaweedfs.trafficDistribution" -}}
{{- if .Values.s3.trafficDistribution -}}
{{- and (eq .Values.s3.trafficDistribution "PreferClose") (semverCompare ">=1.35-0" .Capabilities.KubeVersion.GitVersion) | ternary "PreferSameZone" .Values.s3.trafficDistribution -}}
{{- else if .Values.filer.s3.trafficDistribution -}}
{{- and (eq .Values.filer.s3.trafficDistribution "PreferClose") (semverCompare ">=1.35-0" .Capabilities.KubeVersion.GitVersion) | ternary "PreferSameZone" .Values.filer.s3.trafficDistribution -}}
{{- end -}}
{{- end -}}

58
test/s3/policy/policy_test.go

@ -16,22 +16,25 @@ import (
"github.com/seaweedfs/seaweedfs/weed/command"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
flag "github.com/seaweedfs/seaweedfs/weed/util/fla9"
"github.com/stretchr/testify/require"
)
// TestCluster manages the weed mini instance for integration testing
type TestCluster struct {
dataDir string
ctx context.Context
cancel context.CancelFunc
isRunning bool
wg sync.WaitGroup
masterPort int
volumePort int
filerPort int
s3Port int
s3Endpoint string
dataDir string
ctx context.Context
cancel context.CancelFunc
isRunning bool
wg sync.WaitGroup
masterPort int
masterGrpcPort int
volumePort int
filerPort int
filerGrpcPort int
s3Port int
s3Endpoint string
}
func TestS3PolicyShellRevised(t *testing.T) {
@ -53,8 +56,8 @@ func TestS3PolicyShellRevised(t *testing.T) {
require.NoError(t, tmpPolicyFile.Close())
weedCmd := "weed"
masterAddr := fmt.Sprintf("127.0.0.1:%d", cluster.masterPort)
filerAddr := fmt.Sprintf("127.0.0.1:%d", cluster.filerPort)
masterAddr := string(pb.NewServerAddress("127.0.0.1", cluster.masterPort, cluster.masterGrpcPort))
filerAddr := string(pb.NewServerAddress("127.0.0.1", cluster.filerPort, cluster.filerGrpcPort))
// Put
execShell(t, weedCmd, masterAddr, filerAddr, fmt.Sprintf("s3.policy -put -name=testpolicy -file=%s", tmpPolicyFile.Name()))
@ -156,17 +159,16 @@ func findAvailablePort() (int, error) {
// findAvailablePortPair finds an available http port P such that P and P+10000 (grpc) are both available
func findAvailablePortPair() (int, int, error) {
httpPort, err := findAvailablePort()
if err != nil {
return 0, 0, err
}
for i := 0; i < 100; i++ {
httpPort, err := findAvailablePort()
grpcPort, err := findAvailablePort()
if err != nil {
return 0, 0, err
}
grpcPort := httpPort + 10000
// check if grpc port is available
listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", grpcPort))
if err == nil {
listener.Close()
if grpcPort != httpPort {
return httpPort, grpcPort, nil
}
}
@ -188,14 +190,16 @@ func startMiniCluster(t *testing.T) (*TestCluster, error) {
ctx, cancel := context.WithCancel(context.Background())
s3Endpoint := fmt.Sprintf("http://127.0.0.1:%d", s3Port)
cluster := &TestCluster{
dataDir: testDir,
ctx: ctx,
cancel: cancel,
masterPort: masterPort,
volumePort: volumePort,
filerPort: filerPort,
s3Port: s3Port,
s3Endpoint: s3Endpoint,
dataDir: testDir,
ctx: ctx,
cancel: cancel,
masterPort: masterPort,
masterGrpcPort: masterGrpcPort,
volumePort: volumePort,
filerPort: filerPort,
filerGrpcPort: filerGrpcPort,
s3Port: s3Port,
s3Endpoint: s3Endpoint,
}
// Disable authentication for tests

2
test/s3tables/catalog/iceberg_catalog_test.go

@ -275,7 +275,7 @@ func TestIcebergNamespaces(t *testing.T) {
env.StartSeaweedFS(t)
// Create the default table bucket first via S3
createTableBucket(t, env, "default")
createTableBucket(t, env, "warehouse")
// Test GET /v1/namespaces (should return empty list initially)
resp, err := http.Get(env.IcebergURL() + "/v1/namespaces")

536
test/s3tables/catalog_trino/trino_catalog_test.go

@ -0,0 +1,536 @@
package catalog_trino
import (
"context"
"crypto/rand"
"fmt"
"io"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
type TestEnvironment struct {
seaweedDir string
weedBinary string
dataDir string
bindIP string
s3Port int
s3GrpcPort int
icebergPort int
masterPort int
masterGrpcPort int
filerPort int
filerGrpcPort int
volumePort int
volumeGrpcPort int
weedProcess *exec.Cmd
weedCancel context.CancelFunc
trinoContainer string
dockerAvailable bool
accessKey string
secretKey string
}
func TestTrinoIcebergCatalog(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
env := NewTestEnvironment(t)
defer env.Cleanup(t)
if !env.dockerAvailable {
t.Skip("Docker not available, skipping Trino integration test")
}
fmt.Printf(">>> Starting SeaweedFS...\n")
env.StartSeaweedFS(t)
fmt.Printf(">>> SeaweedFS started.\n")
catalogBucket := "warehouse"
tableBucket := "iceberg-tables"
fmt.Printf(">>> Creating table bucket: %s\n", tableBucket)
createTableBucket(t, env, tableBucket)
fmt.Printf(">>> Creating table bucket: %s\n", catalogBucket)
createTableBucket(t, env, catalogBucket)
fmt.Printf(">>> All buckets created.\n")
// Test Iceberg REST API directly
testIcebergRestAPI(t, env)
configDir := env.writeTrinoConfig(t, catalogBucket)
env.startTrinoContainer(t, configDir)
waitForTrino(t, env.trinoContainer, 60*time.Second)
schemaName := "trino_" + randomString(6)
runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS iceberg.%s", schemaName))
output := runTrinoSQL(t, env.trinoContainer, "SHOW SCHEMAS FROM iceberg")
if !strings.Contains(output, schemaName) {
t.Fatalf("Expected schema %s in output:\n%s", schemaName, output)
}
runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("SHOW TABLES FROM iceberg.%s", schemaName))
}
func NewTestEnvironment(t *testing.T) *TestEnvironment {
t.Helper()
wd, err := os.Getwd()
if err != nil {
t.Fatalf("Failed to get working directory: %v", err)
}
seaweedDir := wd
for i := 0; i < 6; i++ {
if _, err := os.Stat(filepath.Join(seaweedDir, "go.mod")); err == nil {
break
}
seaweedDir = filepath.Dir(seaweedDir)
}
weedBinary := filepath.Join(seaweedDir, "weed", "weed")
if _, err := os.Stat(weedBinary); os.IsNotExist(err) {
weedBinary = "weed"
if _, err := exec.LookPath(weedBinary); err != nil {
t.Skip("weed binary not found, skipping integration test")
}
}
dataDir, err := os.MkdirTemp("", "seaweed-trino-test-*")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
bindIP := findBindIP()
masterPort, masterGrpcPort := mustFreePortPair(t, "Master")
volumePort, volumeGrpcPort := mustFreePortPair(t, "Volume")
filerPort, filerGrpcPort := mustFreePortPair(t, "Filer")
s3Port, s3GrpcPort := mustFreePortPair(t, "S3")
icebergPort := mustFreePort(t, "Iceberg")
return &TestEnvironment{
seaweedDir: seaweedDir,
weedBinary: weedBinary,
dataDir: dataDir,
bindIP: bindIP,
s3Port: s3Port,
s3GrpcPort: s3GrpcPort,
icebergPort: icebergPort,
masterPort: masterPort,
masterGrpcPort: masterGrpcPort,
filerPort: filerPort,
filerGrpcPort: filerGrpcPort,
volumePort: volumePort,
volumeGrpcPort: volumeGrpcPort,
dockerAvailable: hasDocker(),
accessKey: "AKIAIOSFODNN7EXAMPLE",
secretKey: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
}
}
func (env *TestEnvironment) StartSeaweedFS(t *testing.T) {
t.Helper()
// Create IAM config file
iamConfigPath := filepath.Join(env.dataDir, "iam_config.json")
iamConfig := fmt.Sprintf(`{
"identities": [
{
"name": "admin",
"credentials": [
{
"accessKey": "%s",
"secretKey": "%s"
}
],
"actions": [
"Admin",
"Read",
"List",
"Tagging",
"Write"
]
}
]
}`, env.accessKey, env.secretKey)
if err := os.WriteFile(iamConfigPath, []byte(iamConfig), 0644); err != nil {
t.Fatalf("Failed to create IAM config: %v", err)
}
securityToml := filepath.Join(env.dataDir, "security.toml")
if err := os.WriteFile(securityToml, []byte("# Empty security config for testing\n"), 0644); err != nil {
t.Fatalf("Failed to create security.toml: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
env.weedCancel = cancel
cmd := exec.CommandContext(ctx, env.weedBinary, "mini",
"-master.port", fmt.Sprintf("%d", env.masterPort),
"-master.port.grpc", fmt.Sprintf("%d", env.masterGrpcPort),
"-volume.port", fmt.Sprintf("%d", env.volumePort),
"-volume.port.grpc", fmt.Sprintf("%d", env.volumeGrpcPort),
"-filer.port", fmt.Sprintf("%d", env.filerPort),
"-filer.port.grpc", fmt.Sprintf("%d", env.filerGrpcPort),
"-s3.port", fmt.Sprintf("%d", env.s3Port),
"-s3.port.grpc", fmt.Sprintf("%d", env.s3GrpcPort),
"-s3.port.iceberg", fmt.Sprintf("%d", env.icebergPort),
"-s3.config", iamConfigPath,
"-ip", env.bindIP,
"-ip.bind", "0.0.0.0",
"-dir", env.dataDir,
)
cmd.Dir = env.dataDir
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
// Set AWS credentials in environment (for compatibility)
cmd.Env = append(os.Environ(),
"AWS_ACCESS_KEY_ID="+env.accessKey,
"AWS_SECRET_ACCESS_KEY="+env.secretKey,
)
if err := cmd.Start(); err != nil {
t.Fatalf("Failed to start SeaweedFS: %v", err)
}
env.weedProcess = cmd
// Try to check if Iceberg API is ready
// First try checking the /v1/config endpoint (requires auth, so will return 401 if server is up)
icebergURL := fmt.Sprintf("http://%s:%d/v1/config", env.bindIP, env.icebergPort)
if !env.waitForService(icebergURL, 30*time.Second) {
// Try to get more info about why it failed
client := &http.Client{Timeout: 2 * time.Second}
resp, err := client.Get(icebergURL)
if err != nil {
t.Logf("WARNING: Could not connect to Iceberg service at %s: %v", icebergURL, err)
} else {
t.Logf("WARNING: Iceberg service returned status %d at %s", resp.StatusCode, icebergURL)
resp.Body.Close()
}
t.Fatalf("Iceberg REST API did not become ready")
}
}
func (env *TestEnvironment) Cleanup(t *testing.T) {
t.Helper()
if env.trinoContainer != "" {
_ = exec.Command("docker", "rm", "-f", env.trinoContainer).Run()
}
if env.weedCancel != nil {
env.weedCancel()
}
if env.weedProcess != nil {
time.Sleep(2 * time.Second)
_ = env.weedProcess.Wait()
}
if env.dataDir != "" {
_ = os.RemoveAll(env.dataDir)
}
}
func (env *TestEnvironment) waitForService(url string, timeout time.Duration) bool {
client := &http.Client{Timeout: 2 * time.Second}
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
resp, err := client.Get(url)
if err != nil {
// Service not responding yet
time.Sleep(500 * time.Millisecond)
continue
}
statusCode := resp.StatusCode
resp.Body.Close()
// Accept 2xx status codes (successful responses)
if statusCode >= 200 && statusCode < 300 {
return true
}
// Also accept 401/403 (auth errors mean service is up, just needs credentials)
if statusCode == http.StatusUnauthorized || statusCode == http.StatusForbidden {
return true
}
// For other status codes, keep trying
time.Sleep(500 * time.Millisecond)
}
return false
}
func testIcebergRestAPI(t *testing.T, env *TestEnvironment) {
t.Helper()
fmt.Printf(">>> Testing Iceberg REST API directly...\n")
// First, verify the service is listening
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", env.bindIP, env.icebergPort))
if err != nil {
t.Fatalf("Cannot connect to Iceberg service at %s:%d: %v", env.bindIP, env.icebergPort, err)
}
conn.Close()
t.Logf("Successfully connected to Iceberg service at %s:%d", env.bindIP, env.icebergPort)
// Test /v1/config endpoint
url := fmt.Sprintf("http://%s:%d/v1/config", env.bindIP, env.icebergPort)
t.Logf("Testing Iceberg REST API at %s", url)
resp, err := http.Get(url)
if err != nil {
t.Fatalf("Failed to connect to Iceberg REST API at %s: %v", url, err)
}
defer resp.Body.Close()
t.Logf("Iceberg REST API response status: %d", resp.StatusCode)
body, _ := io.ReadAll(resp.Body)
t.Logf("Iceberg REST API response body: %s", string(body))
if resp.StatusCode != http.StatusOK {
t.Fatalf("Expected 200 OK from /v1/config, got %d", resp.StatusCode)
}
}
func (env *TestEnvironment) writeTrinoConfig(t *testing.T, warehouseBucket string) string {
t.Helper()
configDir := filepath.Join(env.dataDir, "trino")
if err := os.MkdirAll(configDir, 0755); err != nil {
t.Fatalf("Failed to create Trino config dir: %v", err)
}
config := fmt.Sprintf(`connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest-catalog.uri=http://host.docker.internal:%d
iceberg.rest-catalog.warehouse=s3://%s/
iceberg.file-format=PARQUET
# S3 storage config
fs.native-s3.enabled=true
s3.endpoint=http://host.docker.internal:%d
s3.path-style-access=true
s3.signer-type=AwsS3V4Signer
s3.aws-access-key=%s
s3.aws-secret-key=%s
s3.region=us-west-2
# REST catalog authentication
iceberg.rest-catalog.security=SIGV4
`, env.icebergPort, warehouseBucket, env.s3Port, env.accessKey, env.secretKey)
if err := os.WriteFile(filepath.Join(configDir, "iceberg.properties"), []byte(config), 0644); err != nil {
t.Fatalf("Failed to write Trino config: %v", err)
}
return configDir
}
func (env *TestEnvironment) startTrinoContainer(t *testing.T, configDir string) {
t.Helper()
containerName := "seaweed-trino-" + randomString(8)
env.trinoContainer = containerName
cmd := exec.Command("docker", "run", "-d",
"--name", containerName,
"--add-host", "host.docker.internal:host-gateway",
"-v", fmt.Sprintf("%s:/etc/trino/catalog", configDir),
"-v", fmt.Sprintf("%s:/test", env.dataDir),
"-e", "AWS_ACCESS_KEY_ID="+env.accessKey,
"-e", "AWS_SECRET_ACCESS_KEY="+env.secretKey,
"-e", "AWS_REGION=us-west-2",
"trinodb/trino:479",
)
if output, err := cmd.CombinedOutput(); err != nil {
t.Fatalf("Failed to start Trino container: %v\n%s", err, string(output))
}
}
func waitForTrino(t *testing.T, containerName string, timeout time.Duration) {
t.Helper()
deadline := time.Now().Add(timeout)
var lastOutput []byte
retryCount := 0
for time.Now().Before(deadline) {
// Try system catalog query as a readiness check
cmd := exec.Command("docker", "exec", containerName,
"trino", "--catalog", "system", "--schema", "runtime",
"--execute", "SELECT 1",
)
if output, err := cmd.CombinedOutput(); err == nil {
return
} else {
lastOutput = output
outputStr := string(output)
if strings.Contains(outputStr, "No such container") ||
strings.Contains(outputStr, "is not running") {
break
}
retryCount++
}
time.Sleep(1 * time.Second)
}
// If we can't connect to system catalog, try to at least connect to Trino server
cmd := exec.Command("docker", "exec", containerName, "trino", "--version")
if err := cmd.Run(); err == nil {
// Trino process is running, even if catalog isn't ready yet
// Give it a bit more time
time.Sleep(5 * time.Second)
return
}
logs, _ := exec.Command("docker", "logs", containerName).CombinedOutput()
t.Fatalf("Timed out waiting for Trino to be ready\nLast output:\n%s\nTrino logs:\n%s", string(lastOutput), string(logs))
}
func runTrinoSQL(t *testing.T, containerName, sql string) string {
t.Helper()
cmd := exec.Command("docker", "exec", containerName,
"trino", "--catalog", "iceberg",
"--output-format", "CSV",
"--execute", sql,
)
output, err := cmd.CombinedOutput()
if err != nil {
logs, _ := exec.Command("docker", "logs", containerName).CombinedOutput()
t.Fatalf("Trino command failed: %v\nSQL: %s\nOutput:\n%s\nTrino logs:\n%s", err, sql, string(output), string(logs))
}
return string(output)
}
func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) {
t.Helper()
// Use weed shell to create the table bucket
// Create with "000000000000" account ID (matches AccountAdmin.Id from auth_credentials.go)
// This ensures bucket owner matches authenticated identity's Account.Id
cmd := exec.Command(env.weedBinary, "shell",
fmt.Sprintf("-master=%s:%d.%d", env.bindIP, env.masterPort, env.masterGrpcPort),
)
cmd.Stdin = strings.NewReader(fmt.Sprintf("s3tables.bucket -create -name %s -account 000000000000\nexit\n", bucketName))
fmt.Printf(">>> EXECUTING: %v\n", cmd.Args)
output, err := cmd.CombinedOutput()
if err != nil {
fmt.Printf(">>> ERROR Output: %s\n", string(output))
t.Fatalf("Failed to create table bucket %s via weed shell: %v\nOutput: %s", bucketName, err, string(output))
}
fmt.Printf(">>> SUCCESS: Created table bucket %s\n", bucketName)
t.Logf("Created table bucket: %s", bucketName)
}
func createObjectBucket(t *testing.T, env *TestEnvironment, bucketName string) {
t.Helper()
// Create an AWS S3 client with the test credentials pointing to our local server
cfg := aws.Config{
Region: "us-east-1",
Credentials: aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(env.accessKey, env.secretKey, "")),
BaseEndpoint: aws.String(fmt.Sprintf("http://%s:%d", env.bindIP, env.s3Port)),
}
client := s3.NewFromConfig(cfg, func(o *s3.Options) {
o.UsePathStyle = true
})
// Create the bucket using standard S3 API
_, err := client.CreateBucket(context.Background(), &s3.CreateBucketInput{
Bucket: aws.String(bucketName),
})
if err != nil {
t.Fatalf("Failed to create object bucket %s: %v", bucketName, err)
}
}
func hasDocker() bool {
cmd := exec.Command("docker", "version")
return cmd.Run() == nil
}
func mustFreePort(t *testing.T, name string) int {
t.Helper()
port, err := getFreePort()
if err != nil {
t.Fatalf("Failed to get free port for %s: %v", name, err)
}
return port
}
func mustFreePortPair(t *testing.T, name string) (int, int) {
t.Helper()
httpPort, grpcPort, err := findAvailablePortPair()
if err != nil {
t.Fatalf("Failed to get free port pair for %s: %v", name, err)
}
return httpPort, grpcPort
}
func findAvailablePortPair() (int, int, error) {
httpPort, err := getFreePort()
if err != nil {
return 0, 0, err
}
grpcPort, err := getFreePort()
if err != nil {
return 0, 0, err
}
return httpPort, grpcPort, nil
}
func getFreePort() (int, error) {
listener, err := net.Listen("tcp", "0.0.0.0:0")
if err != nil {
return 0, err
}
defer listener.Close()
addr := listener.Addr().(*net.TCPAddr)
return addr.Port, nil
}
func findBindIP() string {
addrs, err := net.InterfaceAddrs()
if err != nil {
return "127.0.0.1"
}
for _, addr := range addrs {
ipNet, ok := addr.(*net.IPNet)
if !ok || ipNet.IP == nil {
continue
}
ip := ipNet.IP.To4()
if ip == nil || ip.IsLoopback() || ip.IsLinkLocalUnicast() {
continue
}
return ip.String()
}
return "127.0.0.1"
}
func randomString(length int) string {
const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
b := make([]byte, length)
if _, err := rand.Read(b); err != nil {
panic("failed to generate random string: " + err.Error())
}
for i := range b {
b[i] = charset[int(b[i])%len(charset)]
}
return string(b)
}

4
weed/command/mini.go

@ -963,8 +963,8 @@ func startMiniAdminWithWorker(allServicesReady chan struct{}) {
// Determine bind IP for health checks
bindIp := getBindIp()
// Prepare master address
masterAddr := fmt.Sprintf("%s:%d", *miniIp, *miniMasterOptions.port)
// Prepare master address with gRPC port
masterAddr := string(pb.NewServerAddress(*miniIp, *miniMasterOptions.port, *miniMasterOptions.portGrpc))
// Set admin options
*miniAdminOptions.master = masterAddr

2
weed/command/mount.go

@ -24,6 +24,7 @@ type MountOptions struct {
cacheSizeMBForRead *int64
dataCenter *string
allowOthers *bool
defaultPermissions *bool
umaskString *string
nonempty *bool
volumeServerAccess *string
@ -86,6 +87,7 @@ func init() {
mountOptions.cacheMetaTtlSec = cmdMount.Flag.Int("cacheMetaTtlSec", 60, "metadata cache validity seconds")
mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center")
mountOptions.allowOthers = cmdMount.Flag.Bool("allowOthers", true, "allows other users to access the file system")
mountOptions.defaultPermissions = cmdMount.Flag.Bool("defaultPermissions", true, "enforce permissions by the operating system")
mountOptions.umaskString = cmdMount.Flag.String("umask", "022", "octal umask, e.g., 022, 0111")
mountOptions.nonempty = cmdMount.Flag.Bool("nonempty", false, "allows the mounting over a non-empty directory")
mountOptions.volumeServerAccess = cmdMount.Flag.String("volumeServerAccess", "direct", "access volume servers by [direct|publicUrl|filerProxy]")

11
weed/command/mount_std.go

@ -188,6 +188,9 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
//SyncRead: false, // set to false to enable the FUSE_CAP_ASYNC_READ capability
EnableAcl: true,
}
if *option.defaultPermissions {
fuseMountOptions.Options = append(fuseMountOptions.Options, "default_permissions")
}
if *option.nonempty {
fuseMountOptions.Options = append(fuseMountOptions.Options, "nonempty")
}
@ -216,8 +219,12 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
fuseMountOptions.Options = append(fuseMountOptions.Options, fmt.Sprintf("iosize=%d", ioSizeMB*1024*1024))
}
fuseMountOptions.EnableWriteback = *option.writebackCache
fuseMountOptions.EnableAsyncDio = *option.asyncDio
if option.writebackCache != nil {
fuseMountOptions.EnableWriteback = *option.writebackCache
}
if option.asyncDio != nil {
fuseMountOptions.EnableAsyncDio = *option.asyncDio
}
if option.cacheSymlink != nil && *option.cacheSymlink {
fuseMountOptions.EnableSymlinkCaching = true
}

6
weed/pb/server_address.go

@ -15,7 +15,7 @@ type ServerAddresses string
type ServerSrvAddress string
func NewServerAddress(host string, port int, grpcPort int) ServerAddress {
if grpcPort == 0 || grpcPort == port+10000 {
if grpcPort == 0 {
return ServerAddress(util.JoinHostPort(host, port))
}
return ServerAddress(util.JoinHostPort(host, port) + "." + strconv.Itoa(grpcPort))
@ -25,10 +25,6 @@ func NewServerAddressWithGrpcPort(address string, grpcPort int) ServerAddress {
if grpcPort == 0 {
return ServerAddress(address)
}
_, port, _ := hostAndPort(address)
if uint64(grpcPort) == port+10000 {
return ServerAddress(address)
}
return ServerAddress(address + "." + strconv.Itoa(grpcPort))
}

5
weed/pb/volume_server.proto

@ -9,8 +9,10 @@ import "remote.proto";
// Persistent state for volume servers.
message VolumeServerState {
// Whether the server is in maintenance (i.e. read-only) mode.
// whether the server is in maintenance (i.e. read-only) mode.
bool maintenance = 1;
// incremental version counter
uint32 version = 2;
}
//////////////////////////////////////////////////
@ -643,6 +645,7 @@ enum VolumeScrubMode {
UNKNOWN = 0;
INDEX = 1;
FULL = 2;
LOCAL = 3;
}
message ScrubVolumeRequest {

28
weed/pb/volume_server_pb/volume_server.pb.go

@ -28,6 +28,7 @@ const (
VolumeScrubMode_UNKNOWN VolumeScrubMode = 0
VolumeScrubMode_INDEX VolumeScrubMode = 1
VolumeScrubMode_FULL VolumeScrubMode = 2
VolumeScrubMode_LOCAL VolumeScrubMode = 3
)
// Enum value maps for VolumeScrubMode.
@ -36,11 +37,13 @@ var (
0: "UNKNOWN",
1: "INDEX",
2: "FULL",
3: "LOCAL",
}
VolumeScrubMode_value = map[string]int32{
"UNKNOWN": 0,
"INDEX": 1,
"FULL": 2,
"LOCAL": 3,
}
)
@ -74,8 +77,10 @@ func (VolumeScrubMode) EnumDescriptor() ([]byte, []int) {
// Persistent state for volume servers.
type VolumeServerState struct {
state protoimpl.MessageState `protogen:"open.v1"`
// Whether the server is in maintenance (i.e. read-only) mode.
Maintenance bool `protobuf:"varint,1,opt,name=maintenance,proto3" json:"maintenance,omitempty"`
// whether the server is in maintenance (i.e. read-only) mode.
Maintenance bool `protobuf:"varint,1,opt,name=maintenance,proto3" json:"maintenance,omitempty"`
// incremental version counter
Version uint32 `protobuf:"varint,2,opt,name=version,proto3" json:"version,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@ -117,6 +122,13 @@ func (x *VolumeServerState) GetMaintenance() bool {
return false
}
func (x *VolumeServerState) GetVersion() uint32 {
if x != nil {
return x.Version
}
return 0
}
type BatchDeleteRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
FileIds []string `protobuf:"bytes,1,rep,name=file_ids,json=fileIds,proto3" json:"file_ids,omitempty"`
@ -1855,7 +1867,7 @@ func (x *GetStateResponse) GetState() *VolumeServerState {
type SetStateRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
// SetState updates *all* volume server flags at once. Retrieve state with GetState(),
// SetState updates *all* volume server flags at once. Retrieve state/version with GetState(),
// modify individual flags as required, then call this RPC to update.
State *VolumeServerState `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"`
unknownFields protoimpl.UnknownFields
@ -6687,9 +6699,10 @@ var File_volume_server_proto protoreflect.FileDescriptor
const file_volume_server_proto_rawDesc = "" +
"\n" +
"\x13volume_server.proto\x12\x10volume_server_pb\x1a\fremote.proto\"5\n" +
"\x13volume_server.proto\x12\x10volume_server_pb\x1a\fremote.proto\"O\n" +
"\x11VolumeServerState\x12 \n" +
"\vmaintenance\x18\x01 \x01(\bR\vmaintenance\"[\n" +
"\vmaintenance\x18\x01 \x01(\bR\vmaintenance\x12\x18\n" +
"\aversion\x18\x02 \x01(\rR\aversion\"[\n" +
"\x12BatchDeleteRequest\x12\x19\n" +
"\bfile_ids\x18\x01 \x03(\tR\afileIds\x12*\n" +
"\x11skip_cookie_check\x18\x02 \x01(\bR\x0fskipCookieCheck\"O\n" +
@ -7180,11 +7193,12 @@ const file_volume_server_proto_rawDesc = "" +
"\rstart_time_ns\x18\x01 \x01(\x03R\vstartTimeNs\x12$\n" +
"\x0eremote_time_ns\x18\x02 \x01(\x03R\fremoteTimeNs\x12 \n" +
"\fstop_time_ns\x18\x03 \x01(\x03R\n" +
"stopTimeNs*3\n" +
"stopTimeNs*>\n" +
"\x0fVolumeScrubMode\x12\v\n" +
"\aUNKNOWN\x10\x00\x12\t\n" +
"\x05INDEX\x10\x01\x12\b\n" +
"\x04FULL\x10\x022\xfb(\n" +
"\x04FULL\x10\x02\x12\t\n" +
"\x05LOCAL\x10\x032\xfb(\n" +
"\fVolumeServer\x12\\\n" +
"\vBatchDelete\x12$.volume_server_pb.BatchDeleteRequest\x1a%.volume_server_pb.BatchDeleteResponse\"\x00\x12n\n" +
"\x11VacuumVolumeCheck\x12*.volume_server_pb.VacuumVolumeCheckRequest\x1a+.volume_server_pb.VacuumVolumeCheckResponse\"\x00\x12v\n" +

35
weed/s3api/auth_credentials.go

@ -538,17 +538,45 @@ func (iam *IdentityAccessManagement) ReplaceS3ApiConfiguration(config *iam_pb.S3
}
iam.m.Lock()
// Save existing environment-based identities before replacement
// This ensures AWS_ACCESS_KEY_ID credentials are preserved
envIdentities := make([]*Identity, 0)
for _, ident := range iam.identities {
if ident.IsStatic && strings.HasPrefix(ident.Name, "admin-") {
// This is an environment-based admin identity, preserve it
envIdentities = append(envIdentities, ident)
}
}
// atomically switch
iam.identities = identities
iam.identityAnonymous = identityAnonymous
iam.accounts = accounts
iam.emailAccount = emailAccount
iam.accessKeyIdent = accessKeyIdent
iam.nameToIdentity = nameToIdentity
iam.accessKeyIdent = accessKeyIdent
iam.policies = policies
// Re-add environment-based identities that were preserved
for _, envIdent := range envIdentities {
// Check if this identity already exists in the new config
exists := false
for _, ident := range iam.identities {
if ident.Name == envIdent.Name {
exists = true
break
}
}
if !exists {
iam.identities = append(iam.identities, envIdent)
iam.accessKeyIdent[envIdent.Credentials[0].AccessKey] = envIdent
iam.nameToIdentity[envIdent.Name] = envIdent
}
}
// Update authentication state based on whether identities exist
// Once enabled, keep it enabled (one-way toggle)
authJustEnabled := iam.updateAuthenticationState(len(identities))
authJustEnabled := iam.updateAuthenticationState(len(iam.identities))
iam.m.Unlock()
if authJustEnabled {
@ -778,9 +806,10 @@ func (iam *IdentityAccessManagement) MergeS3ApiConfiguration(config *iam_pb.S3Ap
iam.identityAnonymous = identityAnonymous
iam.accounts = accounts
iam.emailAccount = emailAccount
iam.accessKeyIdent = accessKeyIdent
iam.nameToIdentity = nameToIdentity
iam.accessKeyIdent = accessKeyIdent
iam.policies = policies
iam.accessKeyIdent = accessKeyIdent
// Update authentication state based on whether identities exist
// Once enabled, keep it enabled (one-way toggle)
authJustEnabled := iam.updateAuthenticationState(len(identities))

25
weed/s3api/auth_signature_v4.go

@ -22,6 +22,7 @@ import (
"crypto/hmac"
"crypto/sha256"
"crypto/subtle"
"encoding/base64"
"encoding/hex"
"io"
"net"
@ -104,6 +105,24 @@ func getContentSha256Cksum(r *http.Request) string {
return emptySHA256
}
// normalizePayloadHash converts base64-encoded payload hash to hex format.
// AWS SigV4 canonical requests always use hex-encoded SHA256.
func normalizePayloadHash(payloadHashValue string) string {
// Special values and hex-encoded hashes don't need conversion
if payloadHashValue == emptySHA256 || payloadHashValue == unsignedPayload ||
payloadHashValue == streamingContentSHA256 || payloadHashValue == streamingContentSHA256Trailer ||
payloadHashValue == streamingUnsignedPayload || len(payloadHashValue) == 64 {
return payloadHashValue
}
// Try to decode as base64 and convert to hex
if decodedBytes, err := base64.StdEncoding.DecodeString(payloadHashValue); err == nil && len(decodedBytes) == 32 {
return hex.EncodeToString(decodedBytes)
}
return payloadHashValue
}
// signValues data type represents structured form of AWS Signature V4 header.
type signValues struct {
Credential credentialHeader
@ -485,6 +504,10 @@ func extractV4AuthInfoFromHeader(r *http.Request) (*v4AuthInfo, s3err.ErrorCode)
}
}
// Normalize payload hash to hex format for canonical request
// AWS SigV4 canonical requests always use hex-encoded SHA256
normalizedPayload := normalizePayloadHash(hashedPayload)
return &v4AuthInfo{
Signature: signV4Values.Signature,
AccessKey: signV4Values.Credential.accessKey,
@ -493,7 +516,7 @@ func extractV4AuthInfoFromHeader(r *http.Request) (*v4AuthInfo, s3err.ErrorCode)
Region: signV4Values.Credential.scope.region,
Service: signV4Values.Credential.scope.service,
Scope: signV4Values.Credential.getScope(),
HashedPayload: hashedPayload,
HashedPayload: normalizedPayload,
IsPresigned: false,
}, s3err.ErrNone
}

209
weed/s3api/iceberg/iceberg.go

@ -18,37 +18,11 @@ import (
"github.com/gorilla/mux"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
)
func (s *Server) checkAuth(w http.ResponseWriter, r *http.Request, action s3api.Action, bucketName string) bool {
identityName := s3_constants.GetIdentityNameFromContext(r)
if identityName == "" {
writeError(w, http.StatusUnauthorized, "NotAuthorizedException", "Authentication required")
return false
}
identityObj := s3_constants.GetIdentityFromContext(r)
if identityObj == nil {
writeError(w, http.StatusForbidden, "ForbiddenException", "Access denied: missing identity")
return false
}
identity, ok := identityObj.(*s3api.Identity)
if !ok {
writeError(w, http.StatusForbidden, "ForbiddenException", "Access denied: invalid identity")
return false
}
if !identity.CanDo(action, bucketName, "") {
writeError(w, http.StatusForbidden, "ForbiddenException", "Access denied")
return false
}
return true
}
// FilerClient provides access to the filer for storage operations.
type FilerClient interface {
WithFilerClient(streamingMode bool, fn func(client filer_pb.SeaweedFilerClient) error) error
@ -79,17 +53,20 @@ func NewServer(filerClient FilerClient, authenticator S3Authenticator) *Server {
// RegisterRoutes registers Iceberg REST API routes on the provided router.
func (s *Server) RegisterRoutes(router *mux.Router) {
// Configuration endpoint
router.HandleFunc("/v1/config", s.Auth(s.handleConfig)).Methods(http.MethodGet)
// Add middleware to log all requests/responses
router.Use(loggingMiddleware)
// Configuration endpoint - no auth needed for config
router.HandleFunc("/v1/config", s.handleConfig).Methods(http.MethodGet)
// Namespace endpoints
// Namespace endpoints - wrapped with Auth middleware
router.HandleFunc("/v1/namespaces", s.Auth(s.handleListNamespaces)).Methods(http.MethodGet)
router.HandleFunc("/v1/namespaces", s.Auth(s.handleCreateNamespace)).Methods(http.MethodPost)
router.HandleFunc("/v1/namespaces/{namespace}", s.Auth(s.handleGetNamespace)).Methods(http.MethodGet)
router.HandleFunc("/v1/namespaces/{namespace}", s.Auth(s.handleNamespaceExists)).Methods(http.MethodHead)
router.HandleFunc("/v1/namespaces/{namespace}", s.Auth(s.handleDropNamespace)).Methods(http.MethodDelete)
// Table endpoints
// Table endpoints - wrapped with Auth middleware
router.HandleFunc("/v1/namespaces/{namespace}/tables", s.Auth(s.handleListTables)).Methods(http.MethodGet)
router.HandleFunc("/v1/namespaces/{namespace}/tables", s.Auth(s.handleCreateTable)).Methods(http.MethodPost)
router.HandleFunc("/v1/namespaces/{namespace}/tables/{table}", s.Auth(s.handleLoadTable)).Methods(http.MethodGet)
@ -97,7 +74,7 @@ func (s *Server) RegisterRoutes(router *mux.Router) {
router.HandleFunc("/v1/namespaces/{namespace}/tables/{table}", s.Auth(s.handleDropTable)).Methods(http.MethodDelete)
router.HandleFunc("/v1/namespaces/{namespace}/tables/{table}", s.Auth(s.handleUpdateTable)).Methods(http.MethodPost)
// With prefix support
// With prefix support - wrapped with Auth middleware
router.HandleFunc("/v1/{prefix}/namespaces", s.Auth(s.handleListNamespaces)).Methods(http.MethodGet)
router.HandleFunc("/v1/{prefix}/namespaces", s.Auth(s.handleCreateNamespace)).Methods(http.MethodPost)
router.HandleFunc("/v1/{prefix}/namespaces/{namespace}", s.Auth(s.handleGetNamespace)).Methods(http.MethodGet)
@ -110,7 +87,48 @@ func (s *Server) RegisterRoutes(router *mux.Router) {
router.HandleFunc("/v1/{prefix}/namespaces/{namespace}/tables/{table}", s.Auth(s.handleDropTable)).Methods(http.MethodDelete)
router.HandleFunc("/v1/{prefix}/namespaces/{namespace}/tables/{table}", s.Auth(s.handleUpdateTable)).Methods(http.MethodPost)
glog.V(0).Infof("Registered Iceberg REST Catalog routes")
// Catch-all for debugging
router.PathPrefix("/").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
glog.V(2).Infof("Catch-all route hit: %s %s", r.Method, r.RequestURI)
writeError(w, http.StatusNotFound, "NotFound", "Path not found")
})
glog.V(2).Infof("Registered Iceberg REST Catalog routes")
}
func loggingMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
glog.V(2).Infof("Iceberg REST request: %s %s from %s", r.Method, r.RequestURI, r.RemoteAddr)
// Log all headers for debugging
glog.V(2).Infof("Iceberg REST headers:")
for name, values := range r.Header {
for _, value := range values {
// Redact sensitive headers
if name == "Authorization" && len(value) > 20 {
glog.V(2).Infof(" %s: %s...%s", name, value[:20], value[len(value)-10:])
} else {
glog.V(2).Infof(" %s: %s", name, value)
}
}
}
// Create a response writer that captures the status code
wrapped := &responseWriter{ResponseWriter: w}
next.ServeHTTP(wrapped, r)
glog.V(2).Infof("Iceberg REST response: %s %s -> %d", r.Method, r.RequestURI, wrapped.statusCode)
})
}
type responseWriter struct {
http.ResponseWriter
statusCode int
}
func (w *responseWriter) WriteHeader(code int) {
w.statusCode = code
w.ResponseWriter.WriteHeader(code)
}
func (s *Server) Auth(handler http.HandlerFunc) http.HandlerFunc {
@ -293,8 +311,8 @@ func getBucketFromPrefix(r *http.Request) string {
if prefix := vars["prefix"]; prefix != "" {
return prefix
}
// Default bucket if no prefix
return "default"
// Default bucket if no prefix - use "warehouse" for Iceberg
return "warehouse"
}
// buildTableBucketARN builds an ARN for a table bucket.
@ -305,25 +323,28 @@ func buildTableBucketARN(bucketName string) string {
// handleConfig returns catalog configuration.
func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) {
return
}
glog.Infof("handleConfig: START")
glog.Infof("handleConfig: setting Content-Type header")
w.Header().Set("Content-Type", "application/json")
config := CatalogConfig{
Defaults: map[string]string{},
Overrides: map[string]string{},
}
writeJSON(w, http.StatusOK, config)
glog.Infof("handleConfig: encoding JSON")
if err := json.NewEncoder(w).Encode(config); err != nil {
glog.Warningf("handleConfig: Failed to encode config: %v", err)
}
glog.Infof("handleConfig: COMPLETE")
}
// handleListNamespaces lists namespaces in a catalog.
func (s *Server) handleListNamespaces(w http.ResponseWriter, r *http.Request) {
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_LIST, bucketName) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
// Use S3 Tables manager to list namespaces
var resp s3tables.ListNamespacesResponse
req := &s3tables.ListNamespacesRequest{
@ -333,11 +354,11 @@ func (s *Server) handleListNamespaces(w http.ResponseWriter, r *http.Request) {
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "ListNamespaces", req, &resp, "")
return s.tablesManager.Execute(r.Context(), mgrClient, "ListNamespaces", req, &resp, identityName)
})
if err != nil {
glog.V(1).Infof("Iceberg: ListNamespaces error: %v", err)
glog.Infof("Iceberg: ListNamespaces error: %v", err)
writeError(w, http.StatusInternalServerError, "InternalServerError", err.Error())
return
}
@ -357,11 +378,11 @@ func (s *Server) handleListNamespaces(w http.ResponseWriter, r *http.Request) {
// handleCreateNamespace creates a new namespace.
func (s *Server) handleCreateNamespace(w http.ResponseWriter, r *http.Request) {
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_WRITE, bucketName) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
var req CreateNamespaceRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid request body")
@ -382,15 +403,18 @@ func (s *Server) handleCreateNamespace(w http.ResponseWriter, r *http.Request) {
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "CreateNamespace", createReq, &createResp, "")
glog.Errorf("Iceberg: handleCreateNamespace calling Execute with identityName=%s", identityName)
return s.tablesManager.Execute(r.Context(), mgrClient, "CreateNamespace", createReq, &createResp, identityName)
})
if err != nil {
glog.Errorf("Iceberg: handleCreateNamespace error: %v", err)
if strings.Contains(err.Error(), "already exists") {
writeError(w, http.StatusConflict, "AlreadyExistsException", err.Error())
return
}
glog.V(1).Infof("Iceberg: CreateNamespace error: %v", err)
glog.Infof("Iceberg: CreateNamespace error: %v", err)
writeError(w, http.StatusInternalServerError, "InternalServerError", err.Error())
return
}
@ -418,11 +442,11 @@ func (s *Server) handleGetNamespace(w http.ResponseWriter, r *http.Request) {
}
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
// Use S3 Tables manager to get namespace
getReq := &s3tables.GetNamespaceRequest{
TableBucketARN: bucketARN,
@ -432,7 +456,7 @@ func (s *Server) handleGetNamespace(w http.ResponseWriter, r *http.Request) {
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "GetNamespace", getReq, &getResp, "")
return s.tablesManager.Execute(r.Context(), mgrClient, "GetNamespace", getReq, &getResp, identityName)
})
if err != nil {
@ -462,11 +486,11 @@ func (s *Server) handleNamespaceExists(w http.ResponseWriter, r *http.Request) {
}
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
getReq := &s3tables.GetNamespaceRequest{
TableBucketARN: bucketARN,
Namespace: namespace,
@ -475,7 +499,7 @@ func (s *Server) handleNamespaceExists(w http.ResponseWriter, r *http.Request) {
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "GetNamespace", getReq, &getResp, "")
return s.tablesManager.Execute(r.Context(), mgrClient, "GetNamespace", getReq, &getResp, identityName)
})
if err != nil {
@ -500,11 +524,11 @@ func (s *Server) handleDropNamespace(w http.ResponseWriter, r *http.Request) {
}
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_DELETE_BUCKET, bucketName) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
deleteReq := &s3tables.DeleteNamespaceRequest{
TableBucketARN: bucketARN,
Namespace: namespace,
@ -512,10 +536,11 @@ func (s *Server) handleDropNamespace(w http.ResponseWriter, r *http.Request) {
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "DeleteNamespace", deleteReq, nil, "")
return s.tablesManager.Execute(r.Context(), mgrClient, "DeleteNamespace", deleteReq, nil, identityName)
})
if err != nil {
if strings.Contains(err.Error(), "not found") {
writeError(w, http.StatusNotFound, "NoSuchNamespaceException", fmt.Sprintf("Namespace does not exist: %v", namespace))
return
@ -542,11 +567,11 @@ func (s *Server) handleListTables(w http.ResponseWriter, r *http.Request) {
}
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_LIST, bucketName) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
listReq := &s3tables.ListTablesRequest{
TableBucketARN: bucketARN,
Namespace: namespace,
@ -556,7 +581,7 @@ func (s *Server) handleListTables(w http.ResponseWriter, r *http.Request) {
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "ListTables", listReq, &listResp, "")
return s.tablesManager.Execute(r.Context(), mgrClient, "ListTables", listReq, &listResp, identityName)
})
if err != nil {
@ -605,11 +630,11 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) {
}
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_WRITE, bucketName) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
// Generate UUID for the new table
tableUUID := uuid.New()
location := fmt.Sprintf("s3://%s/%s/%s", bucketName, encodeNamespace(namespace), req.Name)
@ -657,7 +682,7 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) {
err = s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "CreateTable", createReq, &createResp, "")
return s.tablesManager.Execute(r.Context(), mgrClient, "CreateTable", createReq, &createResp, identityName)
})
if err != nil {
@ -696,11 +721,11 @@ func (s *Server) handleLoadTable(w http.ResponseWriter, r *http.Request) {
}
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
getReq := &s3tables.GetTableRequest{
TableBucketARN: bucketARN,
Namespace: namespace,
@ -710,10 +735,11 @@ func (s *Server) handleLoadTable(w http.ResponseWriter, r *http.Request) {
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, "")
return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName)
})
if err != nil {
if strings.Contains(err.Error(), "not found") {
writeError(w, http.StatusNotFound, "NoSuchTableException", fmt.Sprintf("Table does not exist: %s", tableName))
return
@ -771,11 +797,11 @@ func (s *Server) handleTableExists(w http.ResponseWriter, r *http.Request) {
}
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
getReq := &s3tables.GetTableRequest{
TableBucketARN: bucketARN,
Namespace: namespace,
@ -785,7 +811,7 @@ func (s *Server) handleTableExists(w http.ResponseWriter, r *http.Request) {
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, "")
return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName)
})
if err != nil {
@ -807,11 +833,11 @@ func (s *Server) handleDropTable(w http.ResponseWriter, r *http.Request) {
}
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_DELETE_BUCKET, bucketName) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
deleteReq := &s3tables.DeleteTableRequest{
TableBucketARN: bucketARN,
Namespace: namespace,
@ -820,7 +846,7 @@ func (s *Server) handleDropTable(w http.ResponseWriter, r *http.Request) {
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "DeleteTable", deleteReq, nil, "")
return s.tablesManager.Execute(r.Context(), mgrClient, "DeleteTable", deleteReq, nil, identityName)
})
if err != nil {
@ -849,9 +875,10 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
}
bucketName := getBucketFromPrefix(r)
if !s.checkAuth(w, r, s3_constants.ACTION_WRITE, bucketName) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// Extract identity from context
identityName := s3_constants.GetIdentityNameFromContext(r)
// Parse the commit request
var req CommitTableRequest
@ -860,8 +887,6 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
return
}
bucketARN := buildTableBucketARN(bucketName)
// First, load current table metadata
getReq := &s3tables.GetTableRequest{
TableBucketARN: bucketARN,
@ -872,7 +897,7 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, "")
return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName)
})
if err != nil {
@ -985,7 +1010,7 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
// 1. Write metadata file (this would normally be an S3 PutObject,
// but s3tables manager handles the metadata storage logic)
// For now, we assume s3tables.UpdateTable handles the reference update.
return s.tablesManager.Execute(r.Context(), mgrClient, "UpdateTable", updateReq, nil, "")
return s.tablesManager.Execute(r.Context(), mgrClient, "UpdateTable", updateReq, nil, identityName)
})
if err != nil {
@ -1002,14 +1027,6 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, result)
}
// loadTableResultJSON is used for JSON serialization of LoadTableResult.
// It wraps table.Metadata (which is an interface) for proper JSON output.
type loadTableResultJSON struct {
MetadataLocation string `json:"metadata-location,omitempty"`
Metadata table.Metadata `json:"metadata"`
Config iceberg.Properties `json:"config,omitempty"`
}
// newTableMetadata creates a new table.Metadata object with the given parameters.
// Uses iceberg-go's MetadataBuilder pattern for proper spec compliance.
func newTableMetadata(

5
weed/s3api/s3api_tables.go

@ -632,6 +632,7 @@ func buildUntagResourceRequest(r *http.Request) (interface{}, error) {
// which performs granular permission checks based on the specific operation.
func (s3a *S3ApiServer) authenticateS3Tables(f http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
glog.V(2).Infof("S3Tables: authenticateS3Tables called, iam.isEnabled()=%t", s3a.iam.isEnabled())
if !s3a.iam.isEnabled() {
f(w, r)
return
@ -640,15 +641,19 @@ func (s3a *S3ApiServer) authenticateS3Tables(f http.HandlerFunc) http.HandlerFun
// Use AuthSignatureOnly to authenticate the request without authorizing specific actions
identity, errCode := s3a.iam.AuthSignatureOnly(r)
if errCode != s3err.ErrNone {
glog.Errorf("S3Tables: AuthSignatureOnly failed: %v", errCode)
s3err.WriteErrorResponse(w, r, errCode)
return
}
// Store the authenticated identity in request context
if identity != nil && identity.Name != "" {
glog.V(2).Infof("S3Tables: authenticated identity Name=%s Account.Id=%s", identity.Name, identity.Account.Id)
ctx := s3_constants.SetIdentityNameInContext(r.Context(), identity.Name)
ctx = s3_constants.SetIdentityInContext(ctx, identity)
r = r.WithContext(ctx)
} else {
glog.V(2).Infof("S3Tables: authenticated identity is nil or empty name")
}
f(w, r)

23
weed/s3api/s3tables/handler.go

@ -164,9 +164,32 @@ func (h *S3TablesHandler) HandleRequest(w http.ResponseWriter, r *http.Request,
// This is also used as the principal for permission checks, ensuring alignment between
// the caller identity and ownership verification when IAM is enabled.
func (h *S3TablesHandler) getAccountID(r *http.Request) string {
identityRaw := s3_constants.GetIdentityFromContext(r)
if identityRaw != nil {
// Use reflection to access the Account.Id field to avoid import cycle
val := reflect.ValueOf(identityRaw)
if val.Kind() == reflect.Ptr {
val = val.Elem()
}
if val.Kind() == reflect.Struct {
accountField := val.FieldByName("Account")
if accountField.IsValid() && !accountField.IsNil() {
accountVal := accountField.Elem()
if accountVal.Kind() == reflect.Struct {
idField := accountVal.FieldByName("Id")
if idField.IsValid() && idField.Kind() == reflect.String {
id := idField.String()
return id
}
}
}
}
}
if identityName := s3_constants.GetIdentityNameFromContext(r); identityName != "" {
return identityName
}
if accountID := r.Header.Get(s3_constants.AmzAccountId); accountID != "" {
return accountID
}

5
weed/s3api/s3tables/handler_namespace.go

@ -9,13 +9,16 @@ import (
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
// handleCreateNamespace creates a new namespace in a table bucket
func (h *S3TablesHandler) handleCreateNamespace(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
glog.Errorf("S3Tables: handleCreateNamespace called")
var req CreateNamespaceRequest
if err := h.readRequestBody(r, &req); err != nil {
glog.Errorf("S3Tables: handleCreateNamespace failed to read request body: %v", err)
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
@ -83,12 +86,14 @@ func (h *S3TablesHandler) handleCreateNamespace(w http.ResponseWriter, r *http.R
bucketARN := h.generateTableBucketARN(bucketMetadata.OwnerAccountID, bucketName)
principal := h.getAccountID(r)
identityActions := getIdentityActions(r)
glog.Infof("S3Tables: CreateNamespace permission check - principal=%s, owner=%s, actions=%v", principal, bucketMetadata.OwnerAccountID, identityActions)
if !CheckPermissionWithContext("CreateNamespace", principal, bucketMetadata.OwnerAccountID, bucketPolicy, bucketARN, &PolicyContext{
TableBucketName: bucketName,
Namespace: namespaceName,
TableBucketTags: bucketTags,
IdentityActions: identityActions,
}) {
glog.Infof("S3Tables: Permission denied for CreateNamespace - principal=%s, owner=%s", principal, bucketMetadata.OwnerAccountID)
h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to create namespace in this bucket")
return ErrAccessDenied
}

3
weed/s3api/s3tables/permissions.go

@ -5,6 +5,7 @@ import (
"fmt"
"strings"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/s3api/policy_engine"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
)
@ -110,6 +111,8 @@ func CheckPermissionWithContext(operation, principal, owner, resourcePolicy, res
return true
}
glog.V(2).Infof("S3Tables: CheckPermission operation=%s principal=%s owner=%s", operation, principal, owner)
return checkPermission(operation, principal, owner, resourcePolicy, resourceARN, ctx)
}

2
weed/server/volume_grpc_admin.go

@ -273,7 +273,7 @@ func (vs *VolumeServer) VolumeStatus(ctx context.Context, req *volume_server_pb.
func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_server_pb.VolumeServerStatusRequest) (*volume_server_pb.VolumeServerStatusResponse, error) {
resp := &volume_server_pb.VolumeServerStatusResponse{
State: vs.store.State.Pb,
State: vs.store.State.Proto(),
MemoryStatus: stats.MemStat(),
Version: version.Version(),
DataCenter: vs.dataCenter,

18
weed/server/volume_grpc_scrub.go

@ -36,6 +36,8 @@ func (vs *VolumeServer) ScrubVolume(ctx context.Context, req *volume_server_pb.S
switch m := req.GetMode(); m {
case volume_server_pb.VolumeScrubMode_INDEX:
files, serrs = v.CheckIndex()
case volume_server_pb.VolumeScrubMode_LOCAL:
files, serrs = scrubVolumeLocal(ctx, v)
case volume_server_pb.VolumeScrubMode_FULL:
files, serrs = scrubVolumeFull(ctx, v)
default:
@ -61,8 +63,12 @@ func (vs *VolumeServer) ScrubVolume(ctx context.Context, req *volume_server_pb.S
return res, nil
}
func scrubVolumeLocal(ctx context.Context, v *storage.Volume) (int64, []error) {
return 0, []error{fmt.Errorf("scrubVolumeLocal(): not implemented, see https://github.com/seaweedfs/seaweedfs/issues/8018")}
}
func scrubVolumeFull(ctx context.Context, v *storage.Volume) (int64, []error) {
return 0, []error{fmt.Errorf("scrubVolumeFull(): not implemented")}
return 0, []error{fmt.Errorf("scrubVolumeFull(): not implemented, see https://github.com/seaweedfs/seaweedfs/issues/8018")}
}
func (vs *VolumeServer) ScrubEcVolume(ctx context.Context, req *volume_server_pb.ScrubEcVolumeRequest) (*volume_server_pb.ScrubEcVolumeResponse, error) {
@ -94,6 +100,8 @@ func (vs *VolumeServer) ScrubEcVolume(ctx context.Context, req *volume_server_pb
case volume_server_pb.VolumeScrubMode_INDEX:
// index scrubs do not verify individual EC shards
files, serrs = v.CheckIndex()
case volume_server_pb.VolumeScrubMode_LOCAL:
files, shardInfos, serrs = scrubEcVolumeLocal(ctx, v)
case volume_server_pb.VolumeScrubMode_FULL:
files, shardInfos, serrs = scrubEcVolumeFull(ctx, v)
default:
@ -121,6 +129,10 @@ func (vs *VolumeServer) ScrubEcVolume(ctx context.Context, req *volume_server_pb
return res, nil
}
func scrubEcVolumeFull(ctx context.Context, ecv *erasure_coding.EcVolume) (int64, []*volume_server_pb.EcShardInfo, []error) {
return 0, nil, []error{fmt.Errorf("scrubEcVolumeFull(): not implemented")}
func scrubEcVolumeLocal(ctx context.Context, v *erasure_coding.EcVolume) (int64, []*volume_server_pb.EcShardInfo, []error) {
return 0, nil, []error{fmt.Errorf("scrubEcVolumeLocal(): not implemented, see https://github.com/seaweedfs/seaweedfs/issues/8018")}
}
func scrubEcVolumeFull(ctx context.Context, v *erasure_coding.EcVolume) (int64, []*volume_server_pb.EcShardInfo, []error) {
return 0, nil, []error{fmt.Errorf("scrubEcVolumeFull(): not implemented, see https://github.com/seaweedfs/seaweedfs/issues/8018")}
}

6
weed/server/volume_grpc_state.go

@ -9,7 +9,7 @@ import (
// GetState returns a volume server's state flags.
func (vs *VolumeServer) GetState(ctx context.Context, req *volume_server_pb.GetStateRequest) (*volume_server_pb.GetStateResponse, error) {
resp := &volume_server_pb.GetStateResponse{
State: vs.store.State.Pb,
State: vs.store.State.Proto(),
}
return resp, nil
@ -17,9 +17,9 @@ func (vs *VolumeServer) GetState(ctx context.Context, req *volume_server_pb.GetS
// SetState updates state flags for volume servers.
func (vs *VolumeServer) SetState(ctx context.Context, req *volume_server_pb.SetStateRequest) (*volume_server_pb.SetStateResponse, error) {
err := vs.store.State.Update(req.State)
err := vs.store.State.Update(req.GetState())
resp := &volume_server_pb.SetStateResponse{
State: vs.store.State.Pb,
State: vs.store.State.Proto(),
}
return resp, err

2
weed/server/volume_server.go

@ -178,7 +178,7 @@ func (vs *VolumeServer) MaintenanceMode() bool {
if vs.store == nil {
return false
}
return vs.store.State.Pb.GetMaintenance()
return vs.store.State.Proto().GetMaintenance()
}
// Checks if a volume server is in maintenance mode, and returns an error explaining why.

7
weed/server/volume_server_test.go

@ -42,11 +42,8 @@ func TestMaintenanceMode(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
vs := VolumeServer{
store: &storage.Store{
Id: "test_1234",
State: &storage.State{
FilePath: "/some/path.pb",
Pb: tc.pb,
},
Id: "test_1234",
State: storage.NewStateFromProto("/some/path.pb", tc.pb),
},
}

4
weed/shell/command_ec_scrub.go

@ -49,7 +49,7 @@ func (c *commandEcVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer
nodesStr := volScrubCommand.String("node", "", "comma-separated list of volume server <host>:<port> (optional)")
volumeIDsStr := volScrubCommand.String("volumeId", "", "comma-separated EC volume IDs to process (optional)")
// TODO: switch default mode to LOCAL, once implemented.
mode := volScrubCommand.String("mode", "INDEX", "scrubbing mode (INDEX/FULL)")
mode := volScrubCommand.String("mode", "index", "scrubbing mode (index/local/full)")
// TODO: add per-node parallelization
if err = volScrubCommand.Parse(args); err != nil {
@ -92,6 +92,8 @@ func (c *commandEcVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer
switch strings.ToUpper(*mode) {
case "INDEX":
c.mode = volume_server_pb.VolumeScrubMode_INDEX
case "LOCAL":
c.mode = volume_server_pb.VolumeScrubMode_LOCAL
case "FULL":
c.mode = volume_server_pb.VolumeScrubMode_FULL
default:

4
weed/shell/command_volume_scrub.go

@ -50,7 +50,7 @@ func (c *commandVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer io
nodesStr := volScrubCommand.String("node", "", "comma-separated list of volume server <host>:<port> (optional)")
volumeIDsStr := volScrubCommand.String("volumeId", "", "comma-separated volume IDs to process (optional)")
// TODO: switch default mode to LOCAL, once implemented.
mode := volScrubCommand.String("mode", "INDEX", "scrubbing mode (INDEX/FULL)")
mode := volScrubCommand.String("mode", "index", "scrubbing mode (index/local/full)")
// TODO: add per-node parallelization
if err = volScrubCommand.Parse(args); err != nil {
@ -93,6 +93,8 @@ func (c *commandVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer io
switch strings.ToUpper(*mode) {
case "INDEX":
c.mode = volume_server_pb.VolumeScrubMode_INDEX
case "LOCAL":
c.mode = volume_server_pb.VolumeScrubMode_LOCAL
case "FULL":
c.mode = volume_server_pb.VolumeScrubMode_FULL
default:

10
weed/storage/store.go

@ -160,9 +160,9 @@ func NewStore(
func (s *Store) LoadState() error {
err := s.State.Load()
if s.State.Pb != nil && err == nil {
if s.State.Proto() != nil && err == nil {
select {
case s.StateUpdateChan <- s.State.Pb:
case s.StateUpdateChan <- s.State.Proto():
default:
glog.V(2).Infof("StateUpdateChan full during LoadState, state will be reported in heartbeat")
}
@ -171,15 +171,15 @@ func (s *Store) LoadState() error {
}
func (s *Store) SaveState() error {
if s.State.Pb == nil {
if s.State.Proto() == nil {
glog.Warningf("tried to save empty state for store %s", s.Id)
return nil
}
err := s.State.Save()
if s.State.Pb != nil && err == nil {
if s.State.Proto() != nil && err == nil {
select {
case s.StateUpdateChan <- s.State.Pb:
case s.StateUpdateChan <- s.State.Proto():
default:
glog.V(2).Infof("StateUpdateChan full during SaveState, state will be reported in heartbeat")
}

93
weed/storage/store_state.go

@ -4,6 +4,7 @@ import (
"fmt"
"os"
"path/filepath"
"sync"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
@ -17,70 +18,110 @@ const (
)
type State struct {
FilePath string
Pb *volume_server_pb.VolumeServerState
filePath string
pb *volume_server_pb.VolumeServerState
mu sync.Mutex
}
func NewState(dir string) (*State, error) {
state := &State{
FilePath: filepath.Join(dir, StateFileName),
Pb: nil,
filePath: filepath.Join(dir, StateFileName),
pb: nil,
}
err := state.Load()
return state, err
}
func NewStateFromProto(filePath string, state *volume_server_pb.VolumeServerState) *State {
pb := &volume_server_pb.VolumeServerState{}
proto.Merge(pb, state)
return &State{
filePath: filePath,
pb: pb,
}
}
func (st *State) Proto() *volume_server_pb.VolumeServerState {
st.mu.Lock()
defer st.mu.Unlock()
return st.pb
}
func (st *State) Load() error {
st.Pb = &volume_server_pb.VolumeServerState{}
st.mu.Lock()
defer st.mu.Unlock()
st.pb = &volume_server_pb.VolumeServerState{}
if !util.FileExists(st.FilePath) {
glog.V(1).Infof("No preexisting store state at %s", st.FilePath)
if !util.FileExists(st.filePath) {
glog.V(1).Infof("No preexisting store state at %s", st.filePath)
return nil
}
binPb, err := os.ReadFile(st.FilePath)
binPb, err := os.ReadFile(st.filePath)
if err != nil {
st.Pb = nil
return fmt.Errorf("failed to read store state from %s : %v", st.FilePath, err)
st.pb = nil
return fmt.Errorf("failed to read store state from %s : %v", st.filePath, err)
}
if err := proto.Unmarshal(binPb, st.Pb); err != nil {
st.Pb = nil
return fmt.Errorf("failed to parse store state from %s : %v", st.FilePath, err)
if err := proto.Unmarshal(binPb, st.pb); err != nil {
st.pb = nil
return fmt.Errorf("failed to parse store state from %s : %v", st.filePath, err)
}
glog.V(1).Infof("Got store state from %s: %v", st.FilePath, st.Pb)
glog.V(1).Infof("Got store state from %s: %v", st.filePath, st.pb)
return nil
}
func (st *State) Save() error {
if st.Pb == nil {
st.Pb = &volume_server_pb.VolumeServerState{}
func (st *State) save(locking bool) error {
if locking {
st.mu.Lock()
defer st.mu.Unlock()
}
if st.pb == nil {
st.pb = &volume_server_pb.VolumeServerState{}
}
binPb, err := proto.Marshal(st.Pb)
binPb, err := proto.Marshal(st.pb)
if err != nil {
return fmt.Errorf("failed to serialize store state %v: %s", st.Pb, err)
return fmt.Errorf("failed to serialize store state %v: %s", st.pb, err)
}
if err := util.WriteFile(st.FilePath, binPb, StateFileMode); err != nil {
return fmt.Errorf("failed to write store state to %s : %v", st.FilePath, err)
if err := util.WriteFile(st.filePath, binPb, StateFileMode); err != nil {
return fmt.Errorf("failed to write store state to %s : %v", st.filePath, err)
}
glog.V(1).Infof("Saved store state %v to %s", st.Pb, st.FilePath)
glog.V(1).Infof("Saved store state %v to %s", st.pb, st.filePath)
return nil
}
func (st *State) Save() error {
return st.save(true)
}
func (st *State) Update(state *volume_server_pb.VolumeServerState) error {
st.mu.Lock()
defer st.mu.Unlock()
if state == nil {
return nil
}
if got, want := st.pb.GetVersion(), state.GetVersion(); got != want {
return fmt.Errorf("version mismatch for VolumeServerState (got %d, want %d)", got, want)
}
origState := st.pb
st.pb = &volume_server_pb.VolumeServerState{}
proto.Merge(st.pb, state)
st.pb.Version = st.pb.GetVersion() + 1
origState := st.Pb
st.Pb = state
err := st.Save()
err := st.save(false)
if err != nil {
// restore the original state upon save failures, to avoid skew between in-memory and disk state protos.
st.Pb = origState
st.pb = origState
}
return err

Loading…
Cancel
Save