diff --git a/.github/workflows/s3-tables-tests.yml b/.github/workflows/s3-tables-tests.yml index 6dff93e29..1b1d0763d 100644 --- a/.github/workflows/s3-tables-tests.yml +++ b/.github/workflows/s3-tables-tests.yml @@ -140,13 +140,16 @@ jobs: - name: Set up Docker uses: docker/setup-buildx-action@v3 + - name: Pre-pull Trino image + run: docker pull trinodb/trino:479 + - name: Install SeaweedFS run: | go install -buildvcs=false ./weed - name: Run Trino Iceberg Catalog Integration Tests timeout-minutes: 25 - working-directory: test/s3/catalog_trino + working-directory: test/s3tables/catalog_trino run: | set -x set -o pipefail @@ -164,7 +167,7 @@ jobs: - name: Show test output on failure if: failure() - working-directory: test/s3/catalog_trino + working-directory: test/s3tables/catalog_trino run: | echo "=== Test Output ===" if [ -f test-output.log ]; then @@ -179,7 +182,7 @@ jobs: uses: actions/upload-artifact@v6 with: name: trino-iceberg-catalog-test-logs - path: test/s3/catalog_trino/test-output.log + path: test/s3tables/catalog_trino/test-output.log retention-days: 3 s3-tables-build-verification: diff --git a/k8s/charts/seaweedfs/templates/s3/s3-service.yaml b/k8s/charts/seaweedfs/templates/s3/s3-service.yaml index 344677d88..1230a366d 100644 --- a/k8s/charts/seaweedfs/templates/s3/s3-service.yaml +++ b/k8s/charts/seaweedfs/templates/s3/s3-service.yaml @@ -16,6 +16,9 @@ metadata: {{- end }} spec: internalTrafficPolicy: {{ .Values.s3.internalTrafficPolicy | default "Cluster" }} + {{- if and (semverCompare ">=1.31-0" .Capabilities.KubeVersion.GitVersion) (or .Values.s3.trafficDistribution .Values.filer.s3.trafficDistribution) }} + trafficDistribution: {{ include "seaweedfs.trafficDistribution" . }} + {{- end }} ports: - name: "swfs-s3" port: {{ if .Values.s3.enabled }}{{ .Values.s3.port }}{{ else }}{{ .Values.filer.s3.port }}{{ end }} diff --git a/k8s/charts/seaweedfs/templates/shared/_helpers.tpl b/k8s/charts/seaweedfs/templates/shared/_helpers.tpl index 7a42c4704..280ed1657 100644 --- a/k8s/charts/seaweedfs/templates/shared/_helpers.tpl +++ b/k8s/charts/seaweedfs/templates/shared/_helpers.tpl @@ -323,3 +323,12 @@ Create the name of the service account to use {{- define "seaweedfs.serviceAccountName" -}} {{- .Values.global.serviceAccountName | default "seaweedfs" -}} {{- end -}} + +{{/* Generate a compatible trafficDistribution value due to "PreferClose" fast deprecation in k8s v1.35 */}} +{{- define "seaweedfs.trafficDistribution" -}} +{{- if .Values.s3.trafficDistribution -}} +{{- and (eq .Values.s3.trafficDistribution "PreferClose") (semverCompare ">=1.35-0" .Capabilities.KubeVersion.GitVersion) | ternary "PreferSameZone" .Values.s3.trafficDistribution -}} +{{- else if .Values.filer.s3.trafficDistribution -}} +{{- and (eq .Values.filer.s3.trafficDistribution "PreferClose") (semverCompare ">=1.35-0" .Capabilities.KubeVersion.GitVersion) | ternary "PreferSameZone" .Values.filer.s3.trafficDistribution -}} +{{- end -}} +{{- end -}} diff --git a/test/s3/policy/policy_test.go b/test/s3/policy/policy_test.go index 2b181bf89..998bbabdc 100644 --- a/test/s3/policy/policy_test.go +++ b/test/s3/policy/policy_test.go @@ -16,22 +16,25 @@ import ( "github.com/seaweedfs/seaweedfs/weed/command" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" flag "github.com/seaweedfs/seaweedfs/weed/util/fla9" "github.com/stretchr/testify/require" ) // TestCluster manages the weed mini instance for integration testing type TestCluster struct { - dataDir string - ctx context.Context - cancel context.CancelFunc - isRunning bool - wg sync.WaitGroup - masterPort int - volumePort int - filerPort int - s3Port int - s3Endpoint string + dataDir string + ctx context.Context + cancel context.CancelFunc + isRunning bool + wg sync.WaitGroup + masterPort int + masterGrpcPort int + volumePort int + filerPort int + filerGrpcPort int + s3Port int + s3Endpoint string } func TestS3PolicyShellRevised(t *testing.T) { @@ -53,8 +56,8 @@ func TestS3PolicyShellRevised(t *testing.T) { require.NoError(t, tmpPolicyFile.Close()) weedCmd := "weed" - masterAddr := fmt.Sprintf("127.0.0.1:%d", cluster.masterPort) - filerAddr := fmt.Sprintf("127.0.0.1:%d", cluster.filerPort) + masterAddr := string(pb.NewServerAddress("127.0.0.1", cluster.masterPort, cluster.masterGrpcPort)) + filerAddr := string(pb.NewServerAddress("127.0.0.1", cluster.filerPort, cluster.filerGrpcPort)) // Put execShell(t, weedCmd, masterAddr, filerAddr, fmt.Sprintf("s3.policy -put -name=testpolicy -file=%s", tmpPolicyFile.Name())) @@ -156,17 +159,16 @@ func findAvailablePort() (int, error) { // findAvailablePortPair finds an available http port P such that P and P+10000 (grpc) are both available func findAvailablePortPair() (int, int, error) { + httpPort, err := findAvailablePort() + if err != nil { + return 0, 0, err + } for i := 0; i < 100; i++ { - httpPort, err := findAvailablePort() + grpcPort, err := findAvailablePort() if err != nil { return 0, 0, err } - grpcPort := httpPort + 10000 - - // check if grpc port is available - listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", grpcPort)) - if err == nil { - listener.Close() + if grpcPort != httpPort { return httpPort, grpcPort, nil } } @@ -188,14 +190,16 @@ func startMiniCluster(t *testing.T) (*TestCluster, error) { ctx, cancel := context.WithCancel(context.Background()) s3Endpoint := fmt.Sprintf("http://127.0.0.1:%d", s3Port) cluster := &TestCluster{ - dataDir: testDir, - ctx: ctx, - cancel: cancel, - masterPort: masterPort, - volumePort: volumePort, - filerPort: filerPort, - s3Port: s3Port, - s3Endpoint: s3Endpoint, + dataDir: testDir, + ctx: ctx, + cancel: cancel, + masterPort: masterPort, + masterGrpcPort: masterGrpcPort, + volumePort: volumePort, + filerPort: filerPort, + filerGrpcPort: filerGrpcPort, + s3Port: s3Port, + s3Endpoint: s3Endpoint, } // Disable authentication for tests diff --git a/test/s3tables/catalog/iceberg_catalog_test.go b/test/s3tables/catalog/iceberg_catalog_test.go index 592526a93..df2a0bcd1 100644 --- a/test/s3tables/catalog/iceberg_catalog_test.go +++ b/test/s3tables/catalog/iceberg_catalog_test.go @@ -275,7 +275,7 @@ func TestIcebergNamespaces(t *testing.T) { env.StartSeaweedFS(t) // Create the default table bucket first via S3 - createTableBucket(t, env, "default") + createTableBucket(t, env, "warehouse") // Test GET /v1/namespaces (should return empty list initially) resp, err := http.Get(env.IcebergURL() + "/v1/namespaces") diff --git a/test/s3tables/catalog_trino/trino_catalog_test.go b/test/s3tables/catalog_trino/trino_catalog_test.go new file mode 100644 index 000000000..e6bfac357 --- /dev/null +++ b/test/s3tables/catalog_trino/trino_catalog_test.go @@ -0,0 +1,536 @@ +package catalog_trino + +import ( + "context" + "crypto/rand" + "fmt" + "io" + "net" + "net/http" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +type TestEnvironment struct { + seaweedDir string + weedBinary string + dataDir string + bindIP string + s3Port int + s3GrpcPort int + icebergPort int + masterPort int + masterGrpcPort int + filerPort int + filerGrpcPort int + volumePort int + volumeGrpcPort int + weedProcess *exec.Cmd + weedCancel context.CancelFunc + trinoContainer string + dockerAvailable bool + accessKey string + secretKey string +} + +func TestTrinoIcebergCatalog(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + env := NewTestEnvironment(t) + defer env.Cleanup(t) + + if !env.dockerAvailable { + t.Skip("Docker not available, skipping Trino integration test") + } + + fmt.Printf(">>> Starting SeaweedFS...\n") + env.StartSeaweedFS(t) + fmt.Printf(">>> SeaweedFS started.\n") + + catalogBucket := "warehouse" + tableBucket := "iceberg-tables" + fmt.Printf(">>> Creating table bucket: %s\n", tableBucket) + createTableBucket(t, env, tableBucket) + fmt.Printf(">>> Creating table bucket: %s\n", catalogBucket) + createTableBucket(t, env, catalogBucket) + fmt.Printf(">>> All buckets created.\n") + + // Test Iceberg REST API directly + testIcebergRestAPI(t, env) + + configDir := env.writeTrinoConfig(t, catalogBucket) + env.startTrinoContainer(t, configDir) + waitForTrino(t, env.trinoContainer, 60*time.Second) + + schemaName := "trino_" + randomString(6) + + runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS iceberg.%s", schemaName)) + output := runTrinoSQL(t, env.trinoContainer, "SHOW SCHEMAS FROM iceberg") + if !strings.Contains(output, schemaName) { + t.Fatalf("Expected schema %s in output:\n%s", schemaName, output) + } + runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("SHOW TABLES FROM iceberg.%s", schemaName)) +} + +func NewTestEnvironment(t *testing.T) *TestEnvironment { + t.Helper() + + wd, err := os.Getwd() + if err != nil { + t.Fatalf("Failed to get working directory: %v", err) + } + + seaweedDir := wd + for i := 0; i < 6; i++ { + if _, err := os.Stat(filepath.Join(seaweedDir, "go.mod")); err == nil { + break + } + seaweedDir = filepath.Dir(seaweedDir) + } + + weedBinary := filepath.Join(seaweedDir, "weed", "weed") + if _, err := os.Stat(weedBinary); os.IsNotExist(err) { + weedBinary = "weed" + if _, err := exec.LookPath(weedBinary); err != nil { + t.Skip("weed binary not found, skipping integration test") + } + } + + dataDir, err := os.MkdirTemp("", "seaweed-trino-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + + bindIP := findBindIP() + + masterPort, masterGrpcPort := mustFreePortPair(t, "Master") + volumePort, volumeGrpcPort := mustFreePortPair(t, "Volume") + filerPort, filerGrpcPort := mustFreePortPair(t, "Filer") + s3Port, s3GrpcPort := mustFreePortPair(t, "S3") + icebergPort := mustFreePort(t, "Iceberg") + + return &TestEnvironment{ + seaweedDir: seaweedDir, + weedBinary: weedBinary, + dataDir: dataDir, + bindIP: bindIP, + s3Port: s3Port, + s3GrpcPort: s3GrpcPort, + icebergPort: icebergPort, + masterPort: masterPort, + masterGrpcPort: masterGrpcPort, + filerPort: filerPort, + filerGrpcPort: filerGrpcPort, + volumePort: volumePort, + volumeGrpcPort: volumeGrpcPort, + dockerAvailable: hasDocker(), + accessKey: "AKIAIOSFODNN7EXAMPLE", + secretKey: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + } +} + +func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { + t.Helper() + + // Create IAM config file + iamConfigPath := filepath.Join(env.dataDir, "iam_config.json") + iamConfig := fmt.Sprintf(`{ + "identities": [ + { + "name": "admin", + "credentials": [ + { + "accessKey": "%s", + "secretKey": "%s" + } + ], + "actions": [ + "Admin", + "Read", + "List", + "Tagging", + "Write" + ] + } + ] +}`, env.accessKey, env.secretKey) + + if err := os.WriteFile(iamConfigPath, []byte(iamConfig), 0644); err != nil { + t.Fatalf("Failed to create IAM config: %v", err) + } + + securityToml := filepath.Join(env.dataDir, "security.toml") + if err := os.WriteFile(securityToml, []byte("# Empty security config for testing\n"), 0644); err != nil { + t.Fatalf("Failed to create security.toml: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + env.weedCancel = cancel + + cmd := exec.CommandContext(ctx, env.weedBinary, "mini", + "-master.port", fmt.Sprintf("%d", env.masterPort), + "-master.port.grpc", fmt.Sprintf("%d", env.masterGrpcPort), + "-volume.port", fmt.Sprintf("%d", env.volumePort), + "-volume.port.grpc", fmt.Sprintf("%d", env.volumeGrpcPort), + "-filer.port", fmt.Sprintf("%d", env.filerPort), + "-filer.port.grpc", fmt.Sprintf("%d", env.filerGrpcPort), + "-s3.port", fmt.Sprintf("%d", env.s3Port), + "-s3.port.grpc", fmt.Sprintf("%d", env.s3GrpcPort), + "-s3.port.iceberg", fmt.Sprintf("%d", env.icebergPort), + "-s3.config", iamConfigPath, + "-ip", env.bindIP, + "-ip.bind", "0.0.0.0", + "-dir", env.dataDir, + ) + cmd.Dir = env.dataDir + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + // Set AWS credentials in environment (for compatibility) + cmd.Env = append(os.Environ(), + "AWS_ACCESS_KEY_ID="+env.accessKey, + "AWS_SECRET_ACCESS_KEY="+env.secretKey, + ) + + if err := cmd.Start(); err != nil { + t.Fatalf("Failed to start SeaweedFS: %v", err) + } + env.weedProcess = cmd + + // Try to check if Iceberg API is ready + // First try checking the /v1/config endpoint (requires auth, so will return 401 if server is up) + icebergURL := fmt.Sprintf("http://%s:%d/v1/config", env.bindIP, env.icebergPort) + if !env.waitForService(icebergURL, 30*time.Second) { + // Try to get more info about why it failed + client := &http.Client{Timeout: 2 * time.Second} + resp, err := client.Get(icebergURL) + if err != nil { + t.Logf("WARNING: Could not connect to Iceberg service at %s: %v", icebergURL, err) + } else { + t.Logf("WARNING: Iceberg service returned status %d at %s", resp.StatusCode, icebergURL) + resp.Body.Close() + } + t.Fatalf("Iceberg REST API did not become ready") + } +} + +func (env *TestEnvironment) Cleanup(t *testing.T) { + t.Helper() + + if env.trinoContainer != "" { + _ = exec.Command("docker", "rm", "-f", env.trinoContainer).Run() + } + + if env.weedCancel != nil { + env.weedCancel() + } + + if env.weedProcess != nil { + time.Sleep(2 * time.Second) + _ = env.weedProcess.Wait() + } + + if env.dataDir != "" { + _ = os.RemoveAll(env.dataDir) + } +} + +func (env *TestEnvironment) waitForService(url string, timeout time.Duration) bool { + client := &http.Client{Timeout: 2 * time.Second} + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + resp, err := client.Get(url) + if err != nil { + // Service not responding yet + time.Sleep(500 * time.Millisecond) + continue + } + statusCode := resp.StatusCode + resp.Body.Close() + // Accept 2xx status codes (successful responses) + if statusCode >= 200 && statusCode < 300 { + return true + } + // Also accept 401/403 (auth errors mean service is up, just needs credentials) + if statusCode == http.StatusUnauthorized || statusCode == http.StatusForbidden { + return true + } + // For other status codes, keep trying + time.Sleep(500 * time.Millisecond) + } + return false +} + +func testIcebergRestAPI(t *testing.T, env *TestEnvironment) { + t.Helper() + fmt.Printf(">>> Testing Iceberg REST API directly...\n") + + // First, verify the service is listening + conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", env.bindIP, env.icebergPort)) + if err != nil { + t.Fatalf("Cannot connect to Iceberg service at %s:%d: %v", env.bindIP, env.icebergPort, err) + } + conn.Close() + t.Logf("Successfully connected to Iceberg service at %s:%d", env.bindIP, env.icebergPort) + + // Test /v1/config endpoint + url := fmt.Sprintf("http://%s:%d/v1/config", env.bindIP, env.icebergPort) + t.Logf("Testing Iceberg REST API at %s", url) + + resp, err := http.Get(url) + if err != nil { + t.Fatalf("Failed to connect to Iceberg REST API at %s: %v", url, err) + } + defer resp.Body.Close() + + t.Logf("Iceberg REST API response status: %d", resp.StatusCode) + body, _ := io.ReadAll(resp.Body) + t.Logf("Iceberg REST API response body: %s", string(body)) + + if resp.StatusCode != http.StatusOK { + t.Fatalf("Expected 200 OK from /v1/config, got %d", resp.StatusCode) + } +} + +func (env *TestEnvironment) writeTrinoConfig(t *testing.T, warehouseBucket string) string { + t.Helper() + + configDir := filepath.Join(env.dataDir, "trino") + if err := os.MkdirAll(configDir, 0755); err != nil { + t.Fatalf("Failed to create Trino config dir: %v", err) + } + + config := fmt.Sprintf(`connector.name=iceberg +iceberg.catalog.type=rest +iceberg.rest-catalog.uri=http://host.docker.internal:%d +iceberg.rest-catalog.warehouse=s3://%s/ +iceberg.file-format=PARQUET + +# S3 storage config +fs.native-s3.enabled=true +s3.endpoint=http://host.docker.internal:%d +s3.path-style-access=true +s3.signer-type=AwsS3V4Signer +s3.aws-access-key=%s +s3.aws-secret-key=%s +s3.region=us-west-2 + +# REST catalog authentication +iceberg.rest-catalog.security=SIGV4 +`, env.icebergPort, warehouseBucket, env.s3Port, env.accessKey, env.secretKey) + + if err := os.WriteFile(filepath.Join(configDir, "iceberg.properties"), []byte(config), 0644); err != nil { + t.Fatalf("Failed to write Trino config: %v", err) + } + + return configDir +} + +func (env *TestEnvironment) startTrinoContainer(t *testing.T, configDir string) { + t.Helper() + + containerName := "seaweed-trino-" + randomString(8) + env.trinoContainer = containerName + + cmd := exec.Command("docker", "run", "-d", + "--name", containerName, + "--add-host", "host.docker.internal:host-gateway", + "-v", fmt.Sprintf("%s:/etc/trino/catalog", configDir), + "-v", fmt.Sprintf("%s:/test", env.dataDir), + "-e", "AWS_ACCESS_KEY_ID="+env.accessKey, + "-e", "AWS_SECRET_ACCESS_KEY="+env.secretKey, + "-e", "AWS_REGION=us-west-2", + "trinodb/trino:479", + ) + if output, err := cmd.CombinedOutput(); err != nil { + t.Fatalf("Failed to start Trino container: %v\n%s", err, string(output)) + } +} + +func waitForTrino(t *testing.T, containerName string, timeout time.Duration) { + t.Helper() + + deadline := time.Now().Add(timeout) + var lastOutput []byte + retryCount := 0 + for time.Now().Before(deadline) { + // Try system catalog query as a readiness check + cmd := exec.Command("docker", "exec", containerName, + "trino", "--catalog", "system", "--schema", "runtime", + "--execute", "SELECT 1", + ) + if output, err := cmd.CombinedOutput(); err == nil { + return + } else { + lastOutput = output + outputStr := string(output) + if strings.Contains(outputStr, "No such container") || + strings.Contains(outputStr, "is not running") { + break + } + retryCount++ + } + time.Sleep(1 * time.Second) + } + + // If we can't connect to system catalog, try to at least connect to Trino server + cmd := exec.Command("docker", "exec", containerName, "trino", "--version") + if err := cmd.Run(); err == nil { + // Trino process is running, even if catalog isn't ready yet + // Give it a bit more time + time.Sleep(5 * time.Second) + return + } + + logs, _ := exec.Command("docker", "logs", containerName).CombinedOutput() + t.Fatalf("Timed out waiting for Trino to be ready\nLast output:\n%s\nTrino logs:\n%s", string(lastOutput), string(logs)) +} + +func runTrinoSQL(t *testing.T, containerName, sql string) string { + t.Helper() + + cmd := exec.Command("docker", "exec", containerName, + "trino", "--catalog", "iceberg", + "--output-format", "CSV", + "--execute", sql, + ) + output, err := cmd.CombinedOutput() + if err != nil { + logs, _ := exec.Command("docker", "logs", containerName).CombinedOutput() + t.Fatalf("Trino command failed: %v\nSQL: %s\nOutput:\n%s\nTrino logs:\n%s", err, sql, string(output), string(logs)) + } + return string(output) +} + +func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) { + t.Helper() + + // Use weed shell to create the table bucket + // Create with "000000000000" account ID (matches AccountAdmin.Id from auth_credentials.go) + // This ensures bucket owner matches authenticated identity's Account.Id + cmd := exec.Command(env.weedBinary, "shell", + fmt.Sprintf("-master=%s:%d.%d", env.bindIP, env.masterPort, env.masterGrpcPort), + ) + cmd.Stdin = strings.NewReader(fmt.Sprintf("s3tables.bucket -create -name %s -account 000000000000\nexit\n", bucketName)) + fmt.Printf(">>> EXECUTING: %v\n", cmd.Args) + output, err := cmd.CombinedOutput() + if err != nil { + fmt.Printf(">>> ERROR Output: %s\n", string(output)) + t.Fatalf("Failed to create table bucket %s via weed shell: %v\nOutput: %s", bucketName, err, string(output)) + } + fmt.Printf(">>> SUCCESS: Created table bucket %s\n", bucketName) + + t.Logf("Created table bucket: %s", bucketName) +} + +func createObjectBucket(t *testing.T, env *TestEnvironment, bucketName string) { + t.Helper() + + // Create an AWS S3 client with the test credentials pointing to our local server + cfg := aws.Config{ + Region: "us-east-1", + Credentials: aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(env.accessKey, env.secretKey, "")), + BaseEndpoint: aws.String(fmt.Sprintf("http://%s:%d", env.bindIP, env.s3Port)), + } + + client := s3.NewFromConfig(cfg, func(o *s3.Options) { + o.UsePathStyle = true + }) + + // Create the bucket using standard S3 API + _, err := client.CreateBucket(context.Background(), &s3.CreateBucketInput{ + Bucket: aws.String(bucketName), + }) + if err != nil { + t.Fatalf("Failed to create object bucket %s: %v", bucketName, err) + } +} + +func hasDocker() bool { + cmd := exec.Command("docker", "version") + return cmd.Run() == nil +} + +func mustFreePort(t *testing.T, name string) int { + t.Helper() + + port, err := getFreePort() + if err != nil { + t.Fatalf("Failed to get free port for %s: %v", name, err) + } + return port +} + +func mustFreePortPair(t *testing.T, name string) (int, int) { + t.Helper() + + httpPort, grpcPort, err := findAvailablePortPair() + if err != nil { + t.Fatalf("Failed to get free port pair for %s: %v", name, err) + } + return httpPort, grpcPort +} + +func findAvailablePortPair() (int, int, error) { + httpPort, err := getFreePort() + if err != nil { + return 0, 0, err + } + grpcPort, err := getFreePort() + if err != nil { + return 0, 0, err + } + return httpPort, grpcPort, nil +} + +func getFreePort() (int, error) { + listener, err := net.Listen("tcp", "0.0.0.0:0") + if err != nil { + return 0, err + } + defer listener.Close() + + addr := listener.Addr().(*net.TCPAddr) + return addr.Port, nil +} + +func findBindIP() string { + addrs, err := net.InterfaceAddrs() + if err != nil { + return "127.0.0.1" + } + for _, addr := range addrs { + ipNet, ok := addr.(*net.IPNet) + if !ok || ipNet.IP == nil { + continue + } + ip := ipNet.IP.To4() + if ip == nil || ip.IsLoopback() || ip.IsLinkLocalUnicast() { + continue + } + return ip.String() + } + return "127.0.0.1" +} + +func randomString(length int) string { + const charset = "abcdefghijklmnopqrstuvwxyz0123456789" + b := make([]byte, length) + if _, err := rand.Read(b); err != nil { + panic("failed to generate random string: " + err.Error()) + } + for i := range b { + b[i] = charset[int(b[i])%len(charset)] + } + return string(b) +} diff --git a/weed/command/mini.go b/weed/command/mini.go index 956872de6..278b8db6e 100644 --- a/weed/command/mini.go +++ b/weed/command/mini.go @@ -963,8 +963,8 @@ func startMiniAdminWithWorker(allServicesReady chan struct{}) { // Determine bind IP for health checks bindIp := getBindIp() - // Prepare master address - masterAddr := fmt.Sprintf("%s:%d", *miniIp, *miniMasterOptions.port) + // Prepare master address with gRPC port + masterAddr := string(pb.NewServerAddress(*miniIp, *miniMasterOptions.port, *miniMasterOptions.portGrpc)) // Set admin options *miniAdminOptions.master = masterAddr diff --git a/weed/command/mount.go b/weed/command/mount.go index 7407ad908..b45233fb4 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -24,6 +24,7 @@ type MountOptions struct { cacheSizeMBForRead *int64 dataCenter *string allowOthers *bool + defaultPermissions *bool umaskString *string nonempty *bool volumeServerAccess *string @@ -86,6 +87,7 @@ func init() { mountOptions.cacheMetaTtlSec = cmdMount.Flag.Int("cacheMetaTtlSec", 60, "metadata cache validity seconds") mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center") mountOptions.allowOthers = cmdMount.Flag.Bool("allowOthers", true, "allows other users to access the file system") + mountOptions.defaultPermissions = cmdMount.Flag.Bool("defaultPermissions", true, "enforce permissions by the operating system") mountOptions.umaskString = cmdMount.Flag.String("umask", "022", "octal umask, e.g., 022, 0111") mountOptions.nonempty = cmdMount.Flag.Bool("nonempty", false, "allows the mounting over a non-empty directory") mountOptions.volumeServerAccess = cmdMount.Flag.String("volumeServerAccess", "direct", "access volume servers by [direct|publicUrl|filerProxy]") diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 369f2e7c7..342f31009 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -188,6 +188,9 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { //SyncRead: false, // set to false to enable the FUSE_CAP_ASYNC_READ capability EnableAcl: true, } + if *option.defaultPermissions { + fuseMountOptions.Options = append(fuseMountOptions.Options, "default_permissions") + } if *option.nonempty { fuseMountOptions.Options = append(fuseMountOptions.Options, "nonempty") } @@ -216,8 +219,12 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { fuseMountOptions.Options = append(fuseMountOptions.Options, fmt.Sprintf("iosize=%d", ioSizeMB*1024*1024)) } - fuseMountOptions.EnableWriteback = *option.writebackCache - fuseMountOptions.EnableAsyncDio = *option.asyncDio + if option.writebackCache != nil { + fuseMountOptions.EnableWriteback = *option.writebackCache + } + if option.asyncDio != nil { + fuseMountOptions.EnableAsyncDio = *option.asyncDio + } if option.cacheSymlink != nil && *option.cacheSymlink { fuseMountOptions.EnableSymlinkCaching = true } diff --git a/weed/pb/server_address.go b/weed/pb/server_address.go index 943b85519..88cadbb81 100644 --- a/weed/pb/server_address.go +++ b/weed/pb/server_address.go @@ -15,7 +15,7 @@ type ServerAddresses string type ServerSrvAddress string func NewServerAddress(host string, port int, grpcPort int) ServerAddress { - if grpcPort == 0 || grpcPort == port+10000 { + if grpcPort == 0 { return ServerAddress(util.JoinHostPort(host, port)) } return ServerAddress(util.JoinHostPort(host, port) + "." + strconv.Itoa(grpcPort)) @@ -25,10 +25,6 @@ func NewServerAddressWithGrpcPort(address string, grpcPort int) ServerAddress { if grpcPort == 0 { return ServerAddress(address) } - _, port, _ := hostAndPort(address) - if uint64(grpcPort) == port+10000 { - return ServerAddress(address) - } return ServerAddress(address + "." + strconv.Itoa(grpcPort)) } diff --git a/weed/pb/volume_server.proto b/weed/pb/volume_server.proto index 32fc67816..df961cedc 100644 --- a/weed/pb/volume_server.proto +++ b/weed/pb/volume_server.proto @@ -9,8 +9,10 @@ import "remote.proto"; // Persistent state for volume servers. message VolumeServerState { - // Whether the server is in maintenance (i.e. read-only) mode. + // whether the server is in maintenance (i.e. read-only) mode. bool maintenance = 1; + // incremental version counter + uint32 version = 2; } ////////////////////////////////////////////////// @@ -643,6 +645,7 @@ enum VolumeScrubMode { UNKNOWN = 0; INDEX = 1; FULL = 2; + LOCAL = 3; } message ScrubVolumeRequest { diff --git a/weed/pb/volume_server_pb/volume_server.pb.go b/weed/pb/volume_server_pb/volume_server.pb.go index 52f79671e..9467d8d4a 100644 --- a/weed/pb/volume_server_pb/volume_server.pb.go +++ b/weed/pb/volume_server_pb/volume_server.pb.go @@ -28,6 +28,7 @@ const ( VolumeScrubMode_UNKNOWN VolumeScrubMode = 0 VolumeScrubMode_INDEX VolumeScrubMode = 1 VolumeScrubMode_FULL VolumeScrubMode = 2 + VolumeScrubMode_LOCAL VolumeScrubMode = 3 ) // Enum value maps for VolumeScrubMode. @@ -36,11 +37,13 @@ var ( 0: "UNKNOWN", 1: "INDEX", 2: "FULL", + 3: "LOCAL", } VolumeScrubMode_value = map[string]int32{ "UNKNOWN": 0, "INDEX": 1, "FULL": 2, + "LOCAL": 3, } ) @@ -74,8 +77,10 @@ func (VolumeScrubMode) EnumDescriptor() ([]byte, []int) { // Persistent state for volume servers. type VolumeServerState struct { state protoimpl.MessageState `protogen:"open.v1"` - // Whether the server is in maintenance (i.e. read-only) mode. - Maintenance bool `protobuf:"varint,1,opt,name=maintenance,proto3" json:"maintenance,omitempty"` + // whether the server is in maintenance (i.e. read-only) mode. + Maintenance bool `protobuf:"varint,1,opt,name=maintenance,proto3" json:"maintenance,omitempty"` + // incremental version counter + Version uint32 `protobuf:"varint,2,opt,name=version,proto3" json:"version,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -117,6 +122,13 @@ func (x *VolumeServerState) GetMaintenance() bool { return false } +func (x *VolumeServerState) GetVersion() uint32 { + if x != nil { + return x.Version + } + return 0 +} + type BatchDeleteRequest struct { state protoimpl.MessageState `protogen:"open.v1"` FileIds []string `protobuf:"bytes,1,rep,name=file_ids,json=fileIds,proto3" json:"file_ids,omitempty"` @@ -1855,7 +1867,7 @@ func (x *GetStateResponse) GetState() *VolumeServerState { type SetStateRequest struct { state protoimpl.MessageState `protogen:"open.v1"` - // SetState updates *all* volume server flags at once. Retrieve state with GetState(), + // SetState updates *all* volume server flags at once. Retrieve state/version with GetState(), // modify individual flags as required, then call this RPC to update. State *VolumeServerState `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"` unknownFields protoimpl.UnknownFields @@ -6687,9 +6699,10 @@ var File_volume_server_proto protoreflect.FileDescriptor const file_volume_server_proto_rawDesc = "" + "\n" + - "\x13volume_server.proto\x12\x10volume_server_pb\x1a\fremote.proto\"5\n" + + "\x13volume_server.proto\x12\x10volume_server_pb\x1a\fremote.proto\"O\n" + "\x11VolumeServerState\x12 \n" + - "\vmaintenance\x18\x01 \x01(\bR\vmaintenance\"[\n" + + "\vmaintenance\x18\x01 \x01(\bR\vmaintenance\x12\x18\n" + + "\aversion\x18\x02 \x01(\rR\aversion\"[\n" + "\x12BatchDeleteRequest\x12\x19\n" + "\bfile_ids\x18\x01 \x03(\tR\afileIds\x12*\n" + "\x11skip_cookie_check\x18\x02 \x01(\bR\x0fskipCookieCheck\"O\n" + @@ -7180,11 +7193,12 @@ const file_volume_server_proto_rawDesc = "" + "\rstart_time_ns\x18\x01 \x01(\x03R\vstartTimeNs\x12$\n" + "\x0eremote_time_ns\x18\x02 \x01(\x03R\fremoteTimeNs\x12 \n" + "\fstop_time_ns\x18\x03 \x01(\x03R\n" + - "stopTimeNs*3\n" + + "stopTimeNs*>\n" + "\x0fVolumeScrubMode\x12\v\n" + "\aUNKNOWN\x10\x00\x12\t\n" + "\x05INDEX\x10\x01\x12\b\n" + - "\x04FULL\x10\x022\xfb(\n" + + "\x04FULL\x10\x02\x12\t\n" + + "\x05LOCAL\x10\x032\xfb(\n" + "\fVolumeServer\x12\\\n" + "\vBatchDelete\x12$.volume_server_pb.BatchDeleteRequest\x1a%.volume_server_pb.BatchDeleteResponse\"\x00\x12n\n" + "\x11VacuumVolumeCheck\x12*.volume_server_pb.VacuumVolumeCheckRequest\x1a+.volume_server_pb.VacuumVolumeCheckResponse\"\x00\x12v\n" + diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index 0a36c39a8..d305f8b46 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -538,17 +538,45 @@ func (iam *IdentityAccessManagement) ReplaceS3ApiConfiguration(config *iam_pb.S3 } iam.m.Lock() + // Save existing environment-based identities before replacement + // This ensures AWS_ACCESS_KEY_ID credentials are preserved + envIdentities := make([]*Identity, 0) + for _, ident := range iam.identities { + if ident.IsStatic && strings.HasPrefix(ident.Name, "admin-") { + // This is an environment-based admin identity, preserve it + envIdentities = append(envIdentities, ident) + } + } + // atomically switch iam.identities = identities iam.identityAnonymous = identityAnonymous iam.accounts = accounts iam.emailAccount = emailAccount - iam.accessKeyIdent = accessKeyIdent iam.nameToIdentity = nameToIdentity + iam.accessKeyIdent = accessKeyIdent iam.policies = policies + + // Re-add environment-based identities that were preserved + for _, envIdent := range envIdentities { + // Check if this identity already exists in the new config + exists := false + for _, ident := range iam.identities { + if ident.Name == envIdent.Name { + exists = true + break + } + } + if !exists { + iam.identities = append(iam.identities, envIdent) + iam.accessKeyIdent[envIdent.Credentials[0].AccessKey] = envIdent + iam.nameToIdentity[envIdent.Name] = envIdent + } + } + // Update authentication state based on whether identities exist // Once enabled, keep it enabled (one-way toggle) - authJustEnabled := iam.updateAuthenticationState(len(identities)) + authJustEnabled := iam.updateAuthenticationState(len(iam.identities)) iam.m.Unlock() if authJustEnabled { @@ -778,9 +806,10 @@ func (iam *IdentityAccessManagement) MergeS3ApiConfiguration(config *iam_pb.S3Ap iam.identityAnonymous = identityAnonymous iam.accounts = accounts iam.emailAccount = emailAccount - iam.accessKeyIdent = accessKeyIdent iam.nameToIdentity = nameToIdentity + iam.accessKeyIdent = accessKeyIdent iam.policies = policies + iam.accessKeyIdent = accessKeyIdent // Update authentication state based on whether identities exist // Once enabled, keep it enabled (one-way toggle) authJustEnabled := iam.updateAuthenticationState(len(identities)) diff --git a/weed/s3api/auth_signature_v4.go b/weed/s3api/auth_signature_v4.go index e5960c76c..f3dc4c7c9 100644 --- a/weed/s3api/auth_signature_v4.go +++ b/weed/s3api/auth_signature_v4.go @@ -22,6 +22,7 @@ import ( "crypto/hmac" "crypto/sha256" "crypto/subtle" + "encoding/base64" "encoding/hex" "io" "net" @@ -104,6 +105,24 @@ func getContentSha256Cksum(r *http.Request) string { return emptySHA256 } +// normalizePayloadHash converts base64-encoded payload hash to hex format. +// AWS SigV4 canonical requests always use hex-encoded SHA256. +func normalizePayloadHash(payloadHashValue string) string { + // Special values and hex-encoded hashes don't need conversion + if payloadHashValue == emptySHA256 || payloadHashValue == unsignedPayload || + payloadHashValue == streamingContentSHA256 || payloadHashValue == streamingContentSHA256Trailer || + payloadHashValue == streamingUnsignedPayload || len(payloadHashValue) == 64 { + return payloadHashValue + } + + // Try to decode as base64 and convert to hex + if decodedBytes, err := base64.StdEncoding.DecodeString(payloadHashValue); err == nil && len(decodedBytes) == 32 { + return hex.EncodeToString(decodedBytes) + } + + return payloadHashValue +} + // signValues data type represents structured form of AWS Signature V4 header. type signValues struct { Credential credentialHeader @@ -485,6 +504,10 @@ func extractV4AuthInfoFromHeader(r *http.Request) (*v4AuthInfo, s3err.ErrorCode) } } + // Normalize payload hash to hex format for canonical request + // AWS SigV4 canonical requests always use hex-encoded SHA256 + normalizedPayload := normalizePayloadHash(hashedPayload) + return &v4AuthInfo{ Signature: signV4Values.Signature, AccessKey: signV4Values.Credential.accessKey, @@ -493,7 +516,7 @@ func extractV4AuthInfoFromHeader(r *http.Request) (*v4AuthInfo, s3err.ErrorCode) Region: signV4Values.Credential.scope.region, Service: signV4Values.Credential.scope.service, Scope: signV4Values.Credential.getScope(), - HashedPayload: hashedPayload, + HashedPayload: normalizedPayload, IsPresigned: false, }, s3err.ErrNone } diff --git a/weed/s3api/iceberg/iceberg.go b/weed/s3api/iceberg/iceberg.go index 8122b4627..f0e454a51 100644 --- a/weed/s3api/iceberg/iceberg.go +++ b/weed/s3api/iceberg/iceberg.go @@ -18,37 +18,11 @@ import ( "github.com/gorilla/mux" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" - "github.com/seaweedfs/seaweedfs/weed/s3api" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/s3api/s3tables" ) -func (s *Server) checkAuth(w http.ResponseWriter, r *http.Request, action s3api.Action, bucketName string) bool { - identityName := s3_constants.GetIdentityNameFromContext(r) - if identityName == "" { - writeError(w, http.StatusUnauthorized, "NotAuthorizedException", "Authentication required") - return false - } - - identityObj := s3_constants.GetIdentityFromContext(r) - if identityObj == nil { - writeError(w, http.StatusForbidden, "ForbiddenException", "Access denied: missing identity") - return false - } - identity, ok := identityObj.(*s3api.Identity) - if !ok { - writeError(w, http.StatusForbidden, "ForbiddenException", "Access denied: invalid identity") - return false - } - - if !identity.CanDo(action, bucketName, "") { - writeError(w, http.StatusForbidden, "ForbiddenException", "Access denied") - return false - } - return true -} - // FilerClient provides access to the filer for storage operations. type FilerClient interface { WithFilerClient(streamingMode bool, fn func(client filer_pb.SeaweedFilerClient) error) error @@ -79,17 +53,20 @@ func NewServer(filerClient FilerClient, authenticator S3Authenticator) *Server { // RegisterRoutes registers Iceberg REST API routes on the provided router. func (s *Server) RegisterRoutes(router *mux.Router) { - // Configuration endpoint - router.HandleFunc("/v1/config", s.Auth(s.handleConfig)).Methods(http.MethodGet) + // Add middleware to log all requests/responses + router.Use(loggingMiddleware) + + // Configuration endpoint - no auth needed for config + router.HandleFunc("/v1/config", s.handleConfig).Methods(http.MethodGet) - // Namespace endpoints + // Namespace endpoints - wrapped with Auth middleware router.HandleFunc("/v1/namespaces", s.Auth(s.handleListNamespaces)).Methods(http.MethodGet) router.HandleFunc("/v1/namespaces", s.Auth(s.handleCreateNamespace)).Methods(http.MethodPost) router.HandleFunc("/v1/namespaces/{namespace}", s.Auth(s.handleGetNamespace)).Methods(http.MethodGet) router.HandleFunc("/v1/namespaces/{namespace}", s.Auth(s.handleNamespaceExists)).Methods(http.MethodHead) router.HandleFunc("/v1/namespaces/{namespace}", s.Auth(s.handleDropNamespace)).Methods(http.MethodDelete) - // Table endpoints + // Table endpoints - wrapped with Auth middleware router.HandleFunc("/v1/namespaces/{namespace}/tables", s.Auth(s.handleListTables)).Methods(http.MethodGet) router.HandleFunc("/v1/namespaces/{namespace}/tables", s.Auth(s.handleCreateTable)).Methods(http.MethodPost) router.HandleFunc("/v1/namespaces/{namespace}/tables/{table}", s.Auth(s.handleLoadTable)).Methods(http.MethodGet) @@ -97,7 +74,7 @@ func (s *Server) RegisterRoutes(router *mux.Router) { router.HandleFunc("/v1/namespaces/{namespace}/tables/{table}", s.Auth(s.handleDropTable)).Methods(http.MethodDelete) router.HandleFunc("/v1/namespaces/{namespace}/tables/{table}", s.Auth(s.handleUpdateTable)).Methods(http.MethodPost) - // With prefix support + // With prefix support - wrapped with Auth middleware router.HandleFunc("/v1/{prefix}/namespaces", s.Auth(s.handleListNamespaces)).Methods(http.MethodGet) router.HandleFunc("/v1/{prefix}/namespaces", s.Auth(s.handleCreateNamespace)).Methods(http.MethodPost) router.HandleFunc("/v1/{prefix}/namespaces/{namespace}", s.Auth(s.handleGetNamespace)).Methods(http.MethodGet) @@ -110,7 +87,48 @@ func (s *Server) RegisterRoutes(router *mux.Router) { router.HandleFunc("/v1/{prefix}/namespaces/{namespace}/tables/{table}", s.Auth(s.handleDropTable)).Methods(http.MethodDelete) router.HandleFunc("/v1/{prefix}/namespaces/{namespace}/tables/{table}", s.Auth(s.handleUpdateTable)).Methods(http.MethodPost) - glog.V(0).Infof("Registered Iceberg REST Catalog routes") + // Catch-all for debugging + router.PathPrefix("/").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + glog.V(2).Infof("Catch-all route hit: %s %s", r.Method, r.RequestURI) + writeError(w, http.StatusNotFound, "NotFound", "Path not found") + }) + + glog.V(2).Infof("Registered Iceberg REST Catalog routes") +} + +func loggingMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + glog.V(2).Infof("Iceberg REST request: %s %s from %s", r.Method, r.RequestURI, r.RemoteAddr) + + // Log all headers for debugging + glog.V(2).Infof("Iceberg REST headers:") + for name, values := range r.Header { + for _, value := range values { + // Redact sensitive headers + if name == "Authorization" && len(value) > 20 { + glog.V(2).Infof(" %s: %s...%s", name, value[:20], value[len(value)-10:]) + } else { + glog.V(2).Infof(" %s: %s", name, value) + } + } + } + + // Create a response writer that captures the status code + wrapped := &responseWriter{ResponseWriter: w} + next.ServeHTTP(wrapped, r) + + glog.V(2).Infof("Iceberg REST response: %s %s -> %d", r.Method, r.RequestURI, wrapped.statusCode) + }) +} + +type responseWriter struct { + http.ResponseWriter + statusCode int +} + +func (w *responseWriter) WriteHeader(code int) { + w.statusCode = code + w.ResponseWriter.WriteHeader(code) } func (s *Server) Auth(handler http.HandlerFunc) http.HandlerFunc { @@ -293,8 +311,8 @@ func getBucketFromPrefix(r *http.Request) string { if prefix := vars["prefix"]; prefix != "" { return prefix } - // Default bucket if no prefix - return "default" + // Default bucket if no prefix - use "warehouse" for Iceberg + return "warehouse" } // buildTableBucketARN builds an ARN for a table bucket. @@ -305,25 +323,28 @@ func buildTableBucketARN(bucketName string) string { // handleConfig returns catalog configuration. func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) { - bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) { - return - } + glog.Infof("handleConfig: START") + glog.Infof("handleConfig: setting Content-Type header") + w.Header().Set("Content-Type", "application/json") config := CatalogConfig{ Defaults: map[string]string{}, Overrides: map[string]string{}, } - writeJSON(w, http.StatusOK, config) + glog.Infof("handleConfig: encoding JSON") + if err := json.NewEncoder(w).Encode(config); err != nil { + glog.Warningf("handleConfig: Failed to encode config: %v", err) + } + glog.Infof("handleConfig: COMPLETE") } // handleListNamespaces lists namespaces in a catalog. func (s *Server) handleListNamespaces(w http.ResponseWriter, r *http.Request) { bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_LIST, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + // Use S3 Tables manager to list namespaces var resp s3tables.ListNamespacesResponse req := &s3tables.ListNamespacesRequest{ @@ -333,11 +354,11 @@ func (s *Server) handleListNamespaces(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "ListNamespaces", req, &resp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "ListNamespaces", req, &resp, identityName) }) if err != nil { - glog.V(1).Infof("Iceberg: ListNamespaces error: %v", err) + glog.Infof("Iceberg: ListNamespaces error: %v", err) writeError(w, http.StatusInternalServerError, "InternalServerError", err.Error()) return } @@ -357,11 +378,11 @@ func (s *Server) handleListNamespaces(w http.ResponseWriter, r *http.Request) { // handleCreateNamespace creates a new namespace. func (s *Server) handleCreateNamespace(w http.ResponseWriter, r *http.Request) { bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_WRITE, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + var req CreateNamespaceRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid request body") @@ -382,15 +403,18 @@ func (s *Server) handleCreateNamespace(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "CreateNamespace", createReq, &createResp, "") + glog.Errorf("Iceberg: handleCreateNamespace calling Execute with identityName=%s", identityName) + return s.tablesManager.Execute(r.Context(), mgrClient, "CreateNamespace", createReq, &createResp, identityName) }) if err != nil { + glog.Errorf("Iceberg: handleCreateNamespace error: %v", err) + if strings.Contains(err.Error(), "already exists") { writeError(w, http.StatusConflict, "AlreadyExistsException", err.Error()) return } - glog.V(1).Infof("Iceberg: CreateNamespace error: %v", err) + glog.Infof("Iceberg: CreateNamespace error: %v", err) writeError(w, http.StatusInternalServerError, "InternalServerError", err.Error()) return } @@ -418,11 +442,11 @@ func (s *Server) handleGetNamespace(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + // Use S3 Tables manager to get namespace getReq := &s3tables.GetNamespaceRequest{ TableBucketARN: bucketARN, @@ -432,7 +456,7 @@ func (s *Server) handleGetNamespace(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "GetNamespace", getReq, &getResp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "GetNamespace", getReq, &getResp, identityName) }) if err != nil { @@ -462,11 +486,11 @@ func (s *Server) handleNamespaceExists(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + getReq := &s3tables.GetNamespaceRequest{ TableBucketARN: bucketARN, Namespace: namespace, @@ -475,7 +499,7 @@ func (s *Server) handleNamespaceExists(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "GetNamespace", getReq, &getResp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "GetNamespace", getReq, &getResp, identityName) }) if err != nil { @@ -500,11 +524,11 @@ func (s *Server) handleDropNamespace(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_DELETE_BUCKET, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + deleteReq := &s3tables.DeleteNamespaceRequest{ TableBucketARN: bucketARN, Namespace: namespace, @@ -512,10 +536,11 @@ func (s *Server) handleDropNamespace(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "DeleteNamespace", deleteReq, nil, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "DeleteNamespace", deleteReq, nil, identityName) }) if err != nil { + if strings.Contains(err.Error(), "not found") { writeError(w, http.StatusNotFound, "NoSuchNamespaceException", fmt.Sprintf("Namespace does not exist: %v", namespace)) return @@ -542,11 +567,11 @@ func (s *Server) handleListTables(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_LIST, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + listReq := &s3tables.ListTablesRequest{ TableBucketARN: bucketARN, Namespace: namespace, @@ -556,7 +581,7 @@ func (s *Server) handleListTables(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "ListTables", listReq, &listResp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "ListTables", listReq, &listResp, identityName) }) if err != nil { @@ -605,11 +630,11 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_WRITE, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + // Generate UUID for the new table tableUUID := uuid.New() location := fmt.Sprintf("s3://%s/%s/%s", bucketName, encodeNamespace(namespace), req.Name) @@ -657,7 +682,7 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) { err = s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "CreateTable", createReq, &createResp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "CreateTable", createReq, &createResp, identityName) }) if err != nil { @@ -696,11 +721,11 @@ func (s *Server) handleLoadTable(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + getReq := &s3tables.GetTableRequest{ TableBucketARN: bucketARN, Namespace: namespace, @@ -710,10 +735,11 @@ func (s *Server) handleLoadTable(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName) }) if err != nil { + if strings.Contains(err.Error(), "not found") { writeError(w, http.StatusNotFound, "NoSuchTableException", fmt.Sprintf("Table does not exist: %s", tableName)) return @@ -771,11 +797,11 @@ func (s *Server) handleTableExists(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + getReq := &s3tables.GetTableRequest{ TableBucketARN: bucketARN, Namespace: namespace, @@ -785,7 +811,7 @@ func (s *Server) handleTableExists(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName) }) if err != nil { @@ -807,11 +833,11 @@ func (s *Server) handleDropTable(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_DELETE_BUCKET, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + deleteReq := &s3tables.DeleteTableRequest{ TableBucketARN: bucketARN, Namespace: namespace, @@ -820,7 +846,7 @@ func (s *Server) handleDropTable(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "DeleteTable", deleteReq, nil, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "DeleteTable", deleteReq, nil, identityName) }) if err != nil { @@ -849,9 +875,10 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_WRITE, bucketName) { - return - } + bucketARN := buildTableBucketARN(bucketName) + + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) // Parse the commit request var req CommitTableRequest @@ -860,8 +887,6 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { return } - bucketARN := buildTableBucketARN(bucketName) - // First, load current table metadata getReq := &s3tables.GetTableRequest{ TableBucketARN: bucketARN, @@ -872,7 +897,7 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName) }) if err != nil { @@ -985,7 +1010,7 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { // 1. Write metadata file (this would normally be an S3 PutObject, // but s3tables manager handles the metadata storage logic) // For now, we assume s3tables.UpdateTable handles the reference update. - return s.tablesManager.Execute(r.Context(), mgrClient, "UpdateTable", updateReq, nil, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "UpdateTable", updateReq, nil, identityName) }) if err != nil { @@ -1002,14 +1027,6 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, result) } -// loadTableResultJSON is used for JSON serialization of LoadTableResult. -// It wraps table.Metadata (which is an interface) for proper JSON output. -type loadTableResultJSON struct { - MetadataLocation string `json:"metadata-location,omitempty"` - Metadata table.Metadata `json:"metadata"` - Config iceberg.Properties `json:"config,omitempty"` -} - // newTableMetadata creates a new table.Metadata object with the given parameters. // Uses iceberg-go's MetadataBuilder pattern for proper spec compliance. func newTableMetadata( diff --git a/weed/s3api/s3api_tables.go b/weed/s3api/s3api_tables.go index 54177c4d9..b93d5879f 100644 --- a/weed/s3api/s3api_tables.go +++ b/weed/s3api/s3api_tables.go @@ -632,6 +632,7 @@ func buildUntagResourceRequest(r *http.Request) (interface{}, error) { // which performs granular permission checks based on the specific operation. func (s3a *S3ApiServer) authenticateS3Tables(f http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + glog.V(2).Infof("S3Tables: authenticateS3Tables called, iam.isEnabled()=%t", s3a.iam.isEnabled()) if !s3a.iam.isEnabled() { f(w, r) return @@ -640,15 +641,19 @@ func (s3a *S3ApiServer) authenticateS3Tables(f http.HandlerFunc) http.HandlerFun // Use AuthSignatureOnly to authenticate the request without authorizing specific actions identity, errCode := s3a.iam.AuthSignatureOnly(r) if errCode != s3err.ErrNone { + glog.Errorf("S3Tables: AuthSignatureOnly failed: %v", errCode) s3err.WriteErrorResponse(w, r, errCode) return } // Store the authenticated identity in request context if identity != nil && identity.Name != "" { + glog.V(2).Infof("S3Tables: authenticated identity Name=%s Account.Id=%s", identity.Name, identity.Account.Id) ctx := s3_constants.SetIdentityNameInContext(r.Context(), identity.Name) ctx = s3_constants.SetIdentityInContext(ctx, identity) r = r.WithContext(ctx) + } else { + glog.V(2).Infof("S3Tables: authenticated identity is nil or empty name") } f(w, r) diff --git a/weed/s3api/s3tables/handler.go b/weed/s3api/s3tables/handler.go index dae11d562..39b10ce0e 100644 --- a/weed/s3api/s3tables/handler.go +++ b/weed/s3api/s3tables/handler.go @@ -164,9 +164,32 @@ func (h *S3TablesHandler) HandleRequest(w http.ResponseWriter, r *http.Request, // This is also used as the principal for permission checks, ensuring alignment between // the caller identity and ownership verification when IAM is enabled. func (h *S3TablesHandler) getAccountID(r *http.Request) string { + identityRaw := s3_constants.GetIdentityFromContext(r) + if identityRaw != nil { + // Use reflection to access the Account.Id field to avoid import cycle + val := reflect.ValueOf(identityRaw) + if val.Kind() == reflect.Ptr { + val = val.Elem() + } + if val.Kind() == reflect.Struct { + accountField := val.FieldByName("Account") + if accountField.IsValid() && !accountField.IsNil() { + accountVal := accountField.Elem() + if accountVal.Kind() == reflect.Struct { + idField := accountVal.FieldByName("Id") + if idField.IsValid() && idField.Kind() == reflect.String { + id := idField.String() + return id + } + } + } + } + } + if identityName := s3_constants.GetIdentityNameFromContext(r); identityName != "" { return identityName } + if accountID := r.Header.Get(s3_constants.AmzAccountId); accountID != "" { return accountID } diff --git a/weed/s3api/s3tables/handler_namespace.go b/weed/s3api/s3tables/handler_namespace.go index a732edc6e..492a53241 100644 --- a/weed/s3api/s3tables/handler_namespace.go +++ b/weed/s3api/s3tables/handler_namespace.go @@ -9,13 +9,16 @@ import ( "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" ) // handleCreateNamespace creates a new namespace in a table bucket func (h *S3TablesHandler) handleCreateNamespace(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + glog.Errorf("S3Tables: handleCreateNamespace called") var req CreateNamespaceRequest if err := h.readRequestBody(r, &req); err != nil { + glog.Errorf("S3Tables: handleCreateNamespace failed to read request body: %v", err) h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) return err } @@ -83,12 +86,14 @@ func (h *S3TablesHandler) handleCreateNamespace(w http.ResponseWriter, r *http.R bucketARN := h.generateTableBucketARN(bucketMetadata.OwnerAccountID, bucketName) principal := h.getAccountID(r) identityActions := getIdentityActions(r) + glog.Infof("S3Tables: CreateNamespace permission check - principal=%s, owner=%s, actions=%v", principal, bucketMetadata.OwnerAccountID, identityActions) if !CheckPermissionWithContext("CreateNamespace", principal, bucketMetadata.OwnerAccountID, bucketPolicy, bucketARN, &PolicyContext{ TableBucketName: bucketName, Namespace: namespaceName, TableBucketTags: bucketTags, IdentityActions: identityActions, }) { + glog.Infof("S3Tables: Permission denied for CreateNamespace - principal=%s, owner=%s", principal, bucketMetadata.OwnerAccountID) h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to create namespace in this bucket") return ErrAccessDenied } diff --git a/weed/s3api/s3tables/permissions.go b/weed/s3api/s3tables/permissions.go index af28c0a88..17d3b1a04 100644 --- a/weed/s3api/s3tables/permissions.go +++ b/weed/s3api/s3tables/permissions.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/s3api/policy_engine" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" ) @@ -110,6 +111,8 @@ func CheckPermissionWithContext(operation, principal, owner, resourcePolicy, res return true } + glog.V(2).Infof("S3Tables: CheckPermission operation=%s principal=%s owner=%s", operation, principal, owner) + return checkPermission(operation, principal, owner, resourcePolicy, resourceARN, ctx) } diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index 584b1a1d2..963ec8773 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -273,7 +273,7 @@ func (vs *VolumeServer) VolumeStatus(ctx context.Context, req *volume_server_pb. func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_server_pb.VolumeServerStatusRequest) (*volume_server_pb.VolumeServerStatusResponse, error) { resp := &volume_server_pb.VolumeServerStatusResponse{ - State: vs.store.State.Pb, + State: vs.store.State.Proto(), MemoryStatus: stats.MemStat(), Version: version.Version(), DataCenter: vs.dataCenter, diff --git a/weed/server/volume_grpc_scrub.go b/weed/server/volume_grpc_scrub.go index 9d8d42582..43e33f2d4 100644 --- a/weed/server/volume_grpc_scrub.go +++ b/weed/server/volume_grpc_scrub.go @@ -36,6 +36,8 @@ func (vs *VolumeServer) ScrubVolume(ctx context.Context, req *volume_server_pb.S switch m := req.GetMode(); m { case volume_server_pb.VolumeScrubMode_INDEX: files, serrs = v.CheckIndex() + case volume_server_pb.VolumeScrubMode_LOCAL: + files, serrs = scrubVolumeLocal(ctx, v) case volume_server_pb.VolumeScrubMode_FULL: files, serrs = scrubVolumeFull(ctx, v) default: @@ -61,8 +63,12 @@ func (vs *VolumeServer) ScrubVolume(ctx context.Context, req *volume_server_pb.S return res, nil } +func scrubVolumeLocal(ctx context.Context, v *storage.Volume) (int64, []error) { + return 0, []error{fmt.Errorf("scrubVolumeLocal(): not implemented, see https://github.com/seaweedfs/seaweedfs/issues/8018")} +} + func scrubVolumeFull(ctx context.Context, v *storage.Volume) (int64, []error) { - return 0, []error{fmt.Errorf("scrubVolumeFull(): not implemented")} + return 0, []error{fmt.Errorf("scrubVolumeFull(): not implemented, see https://github.com/seaweedfs/seaweedfs/issues/8018")} } func (vs *VolumeServer) ScrubEcVolume(ctx context.Context, req *volume_server_pb.ScrubEcVolumeRequest) (*volume_server_pb.ScrubEcVolumeResponse, error) { @@ -94,6 +100,8 @@ func (vs *VolumeServer) ScrubEcVolume(ctx context.Context, req *volume_server_pb case volume_server_pb.VolumeScrubMode_INDEX: // index scrubs do not verify individual EC shards files, serrs = v.CheckIndex() + case volume_server_pb.VolumeScrubMode_LOCAL: + files, shardInfos, serrs = scrubEcVolumeLocal(ctx, v) case volume_server_pb.VolumeScrubMode_FULL: files, shardInfos, serrs = scrubEcVolumeFull(ctx, v) default: @@ -121,6 +129,10 @@ func (vs *VolumeServer) ScrubEcVolume(ctx context.Context, req *volume_server_pb return res, nil } -func scrubEcVolumeFull(ctx context.Context, ecv *erasure_coding.EcVolume) (int64, []*volume_server_pb.EcShardInfo, []error) { - return 0, nil, []error{fmt.Errorf("scrubEcVolumeFull(): not implemented")} +func scrubEcVolumeLocal(ctx context.Context, v *erasure_coding.EcVolume) (int64, []*volume_server_pb.EcShardInfo, []error) { + return 0, nil, []error{fmt.Errorf("scrubEcVolumeLocal(): not implemented, see https://github.com/seaweedfs/seaweedfs/issues/8018")} +} + +func scrubEcVolumeFull(ctx context.Context, v *erasure_coding.EcVolume) (int64, []*volume_server_pb.EcShardInfo, []error) { + return 0, nil, []error{fmt.Errorf("scrubEcVolumeFull(): not implemented, see https://github.com/seaweedfs/seaweedfs/issues/8018")} } diff --git a/weed/server/volume_grpc_state.go b/weed/server/volume_grpc_state.go index 0f5f0e92e..52bc823d4 100644 --- a/weed/server/volume_grpc_state.go +++ b/weed/server/volume_grpc_state.go @@ -9,7 +9,7 @@ import ( // GetState returns a volume server's state flags. func (vs *VolumeServer) GetState(ctx context.Context, req *volume_server_pb.GetStateRequest) (*volume_server_pb.GetStateResponse, error) { resp := &volume_server_pb.GetStateResponse{ - State: vs.store.State.Pb, + State: vs.store.State.Proto(), } return resp, nil @@ -17,9 +17,9 @@ func (vs *VolumeServer) GetState(ctx context.Context, req *volume_server_pb.GetS // SetState updates state flags for volume servers. func (vs *VolumeServer) SetState(ctx context.Context, req *volume_server_pb.SetStateRequest) (*volume_server_pb.SetStateResponse, error) { - err := vs.store.State.Update(req.State) + err := vs.store.State.Update(req.GetState()) resp := &volume_server_pb.SetStateResponse{ - State: vs.store.State.Pb, + State: vs.store.State.Proto(), } return resp, err diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 23676c3f8..6bc3a6898 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -178,7 +178,7 @@ func (vs *VolumeServer) MaintenanceMode() bool { if vs.store == nil { return false } - return vs.store.State.Pb.GetMaintenance() + return vs.store.State.Proto().GetMaintenance() } // Checks if a volume server is in maintenance mode, and returns an error explaining why. diff --git a/weed/server/volume_server_test.go b/weed/server/volume_server_test.go index 1115a4b4a..ac1ad774e 100644 --- a/weed/server/volume_server_test.go +++ b/weed/server/volume_server_test.go @@ -42,11 +42,8 @@ func TestMaintenanceMode(t *testing.T) { t.Run(tc.name, func(t *testing.T) { vs := VolumeServer{ store: &storage.Store{ - Id: "test_1234", - State: &storage.State{ - FilePath: "/some/path.pb", - Pb: tc.pb, - }, + Id: "test_1234", + State: storage.NewStateFromProto("/some/path.pb", tc.pb), }, } diff --git a/weed/shell/command_ec_scrub.go b/weed/shell/command_ec_scrub.go index c4bac0f8e..587d5e5ef 100644 --- a/weed/shell/command_ec_scrub.go +++ b/weed/shell/command_ec_scrub.go @@ -49,7 +49,7 @@ func (c *commandEcVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer nodesStr := volScrubCommand.String("node", "", "comma-separated list of volume server : (optional)") volumeIDsStr := volScrubCommand.String("volumeId", "", "comma-separated EC volume IDs to process (optional)") // TODO: switch default mode to LOCAL, once implemented. - mode := volScrubCommand.String("mode", "INDEX", "scrubbing mode (INDEX/FULL)") + mode := volScrubCommand.String("mode", "index", "scrubbing mode (index/local/full)") // TODO: add per-node parallelization if err = volScrubCommand.Parse(args); err != nil { @@ -92,6 +92,8 @@ func (c *commandEcVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer switch strings.ToUpper(*mode) { case "INDEX": c.mode = volume_server_pb.VolumeScrubMode_INDEX + case "LOCAL": + c.mode = volume_server_pb.VolumeScrubMode_LOCAL case "FULL": c.mode = volume_server_pb.VolumeScrubMode_FULL default: diff --git a/weed/shell/command_volume_scrub.go b/weed/shell/command_volume_scrub.go index a3d90924f..c8a43729f 100644 --- a/weed/shell/command_volume_scrub.go +++ b/weed/shell/command_volume_scrub.go @@ -50,7 +50,7 @@ func (c *commandVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer io nodesStr := volScrubCommand.String("node", "", "comma-separated list of volume server : (optional)") volumeIDsStr := volScrubCommand.String("volumeId", "", "comma-separated volume IDs to process (optional)") // TODO: switch default mode to LOCAL, once implemented. - mode := volScrubCommand.String("mode", "INDEX", "scrubbing mode (INDEX/FULL)") + mode := volScrubCommand.String("mode", "index", "scrubbing mode (index/local/full)") // TODO: add per-node parallelization if err = volScrubCommand.Parse(args); err != nil { @@ -93,6 +93,8 @@ func (c *commandVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer io switch strings.ToUpper(*mode) { case "INDEX": c.mode = volume_server_pb.VolumeScrubMode_INDEX + case "LOCAL": + c.mode = volume_server_pb.VolumeScrubMode_LOCAL case "FULL": c.mode = volume_server_pb.VolumeScrubMode_FULL default: diff --git a/weed/storage/store.go b/weed/storage/store.go index 389980667..3e5bcd3cf 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -160,9 +160,9 @@ func NewStore( func (s *Store) LoadState() error { err := s.State.Load() - if s.State.Pb != nil && err == nil { + if s.State.Proto() != nil && err == nil { select { - case s.StateUpdateChan <- s.State.Pb: + case s.StateUpdateChan <- s.State.Proto(): default: glog.V(2).Infof("StateUpdateChan full during LoadState, state will be reported in heartbeat") } @@ -171,15 +171,15 @@ func (s *Store) LoadState() error { } func (s *Store) SaveState() error { - if s.State.Pb == nil { + if s.State.Proto() == nil { glog.Warningf("tried to save empty state for store %s", s.Id) return nil } err := s.State.Save() - if s.State.Pb != nil && err == nil { + if s.State.Proto() != nil && err == nil { select { - case s.StateUpdateChan <- s.State.Pb: + case s.StateUpdateChan <- s.State.Proto(): default: glog.V(2).Infof("StateUpdateChan full during SaveState, state will be reported in heartbeat") } diff --git a/weed/storage/store_state.go b/weed/storage/store_state.go index 7b0a2fc3a..2bac4fae6 100644 --- a/weed/storage/store_state.go +++ b/weed/storage/store_state.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "sync" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" @@ -17,70 +18,110 @@ const ( ) type State struct { - FilePath string - Pb *volume_server_pb.VolumeServerState + filePath string + pb *volume_server_pb.VolumeServerState + + mu sync.Mutex } func NewState(dir string) (*State, error) { state := &State{ - FilePath: filepath.Join(dir, StateFileName), - Pb: nil, + filePath: filepath.Join(dir, StateFileName), + pb: nil, } err := state.Load() return state, err } +func NewStateFromProto(filePath string, state *volume_server_pb.VolumeServerState) *State { + pb := &volume_server_pb.VolumeServerState{} + proto.Merge(pb, state) + + return &State{ + filePath: filePath, + pb: pb, + } +} + +func (st *State) Proto() *volume_server_pb.VolumeServerState { + st.mu.Lock() + defer st.mu.Unlock() + + return st.pb +} + func (st *State) Load() error { - st.Pb = &volume_server_pb.VolumeServerState{} + st.mu.Lock() + defer st.mu.Unlock() + + st.pb = &volume_server_pb.VolumeServerState{} - if !util.FileExists(st.FilePath) { - glog.V(1).Infof("No preexisting store state at %s", st.FilePath) + if !util.FileExists(st.filePath) { + glog.V(1).Infof("No preexisting store state at %s", st.filePath) return nil } - binPb, err := os.ReadFile(st.FilePath) + binPb, err := os.ReadFile(st.filePath) if err != nil { - st.Pb = nil - return fmt.Errorf("failed to read store state from %s : %v", st.FilePath, err) + st.pb = nil + return fmt.Errorf("failed to read store state from %s : %v", st.filePath, err) } - if err := proto.Unmarshal(binPb, st.Pb); err != nil { - st.Pb = nil - return fmt.Errorf("failed to parse store state from %s : %v", st.FilePath, err) + if err := proto.Unmarshal(binPb, st.pb); err != nil { + st.pb = nil + return fmt.Errorf("failed to parse store state from %s : %v", st.filePath, err) } - glog.V(1).Infof("Got store state from %s: %v", st.FilePath, st.Pb) + glog.V(1).Infof("Got store state from %s: %v", st.filePath, st.pb) return nil } -func (st *State) Save() error { - if st.Pb == nil { - st.Pb = &volume_server_pb.VolumeServerState{} +func (st *State) save(locking bool) error { + if locking { + st.mu.Lock() + defer st.mu.Unlock() + } + + if st.pb == nil { + st.pb = &volume_server_pb.VolumeServerState{} } - binPb, err := proto.Marshal(st.Pb) + binPb, err := proto.Marshal(st.pb) if err != nil { - return fmt.Errorf("failed to serialize store state %v: %s", st.Pb, err) + return fmt.Errorf("failed to serialize store state %v: %s", st.pb, err) } - if err := util.WriteFile(st.FilePath, binPb, StateFileMode); err != nil { - return fmt.Errorf("failed to write store state to %s : %v", st.FilePath, err) + if err := util.WriteFile(st.filePath, binPb, StateFileMode); err != nil { + return fmt.Errorf("failed to write store state to %s : %v", st.filePath, err) } - glog.V(1).Infof("Saved store state %v to %s", st.Pb, st.FilePath) + glog.V(1).Infof("Saved store state %v to %s", st.pb, st.filePath) return nil } +func (st *State) Save() error { + return st.save(true) +} + func (st *State) Update(state *volume_server_pb.VolumeServerState) error { + st.mu.Lock() + defer st.mu.Unlock() + if state == nil { return nil } + if got, want := st.pb.GetVersion(), state.GetVersion(); got != want { + return fmt.Errorf("version mismatch for VolumeServerState (got %d, want %d)", got, want) + } + + origState := st.pb + st.pb = &volume_server_pb.VolumeServerState{} + proto.Merge(st.pb, state) + st.pb.Version = st.pb.GetVersion() + 1 - origState := st.Pb - st.Pb = state - err := st.Save() + err := st.save(false) if err != nil { // restore the original state upon save failures, to avoid skew between in-memory and disk state protos. - st.Pb = origState + st.pb = origState } return err