diff --git a/.github/workflows/s3-tables-tests.yml b/.github/workflows/s3-tables-tests.yml index 8b445b85b..1b1d0763d 100644 --- a/.github/workflows/s3-tables-tests.yml +++ b/.github/workflows/s3-tables-tests.yml @@ -122,6 +122,69 @@ jobs: path: test/s3tables/catalog/test-output.log retention-days: 3 + trino-iceberg-catalog-tests: + name: Trino Iceberg Catalog Integration Tests + runs-on: ubuntu-22.04 + timeout-minutes: 30 + + steps: + - name: Check out code + uses: actions/checkout@v6 + + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' + id: go + + - name: Set up Docker + uses: docker/setup-buildx-action@v3 + + - name: Pre-pull Trino image + run: docker pull trinodb/trino:479 + + - name: Install SeaweedFS + run: | + go install -buildvcs=false ./weed + + - name: Run Trino Iceberg Catalog Integration Tests + timeout-minutes: 25 + working-directory: test/s3tables/catalog_trino + run: | + set -x + set -o pipefail + echo "=== System Information ===" + uname -a + free -h + df -h + echo "=== Starting Trino Iceberg Catalog Tests ===" + + # Run Trino + Iceberg catalog integration tests + go test -v -timeout 20m . 2>&1 | tee test-output.log || { + echo "Trino Iceberg catalog integration tests failed" + exit 1 + } + + - name: Show test output on failure + if: failure() + working-directory: test/s3tables/catalog_trino + run: | + echo "=== Test Output ===" + if [ -f test-output.log ]; then + tail -200 test-output.log + fi + + echo "=== Process information ===" + ps aux | grep -E "(weed|test|docker)" || true + + - name: Upload test logs on failure + if: failure() + uses: actions/upload-artifact@v6 + with: + name: trino-iceberg-catalog-test-logs + path: test/s3tables/catalog_trino/test-output.log + retention-days: 3 + s3-tables-build-verification: name: S3 Tables Build Verification runs-on: ubuntu-22.04 diff --git a/test/s3/policy/policy_test.go b/test/s3/policy/policy_test.go index 2b181bf89..998bbabdc 100644 --- a/test/s3/policy/policy_test.go +++ b/test/s3/policy/policy_test.go @@ -16,22 +16,25 @@ import ( "github.com/seaweedfs/seaweedfs/weed/command" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" flag "github.com/seaweedfs/seaweedfs/weed/util/fla9" "github.com/stretchr/testify/require" ) // TestCluster manages the weed mini instance for integration testing type TestCluster struct { - dataDir string - ctx context.Context - cancel context.CancelFunc - isRunning bool - wg sync.WaitGroup - masterPort int - volumePort int - filerPort int - s3Port int - s3Endpoint string + dataDir string + ctx context.Context + cancel context.CancelFunc + isRunning bool + wg sync.WaitGroup + masterPort int + masterGrpcPort int + volumePort int + filerPort int + filerGrpcPort int + s3Port int + s3Endpoint string } func TestS3PolicyShellRevised(t *testing.T) { @@ -53,8 +56,8 @@ func TestS3PolicyShellRevised(t *testing.T) { require.NoError(t, tmpPolicyFile.Close()) weedCmd := "weed" - masterAddr := fmt.Sprintf("127.0.0.1:%d", cluster.masterPort) - filerAddr := fmt.Sprintf("127.0.0.1:%d", cluster.filerPort) + masterAddr := string(pb.NewServerAddress("127.0.0.1", cluster.masterPort, cluster.masterGrpcPort)) + filerAddr := string(pb.NewServerAddress("127.0.0.1", cluster.filerPort, cluster.filerGrpcPort)) // Put execShell(t, weedCmd, masterAddr, filerAddr, fmt.Sprintf("s3.policy -put -name=testpolicy -file=%s", tmpPolicyFile.Name())) @@ -156,17 +159,16 @@ func findAvailablePort() (int, error) { // findAvailablePortPair finds an available http port P such that P and P+10000 (grpc) are both available func findAvailablePortPair() (int, int, error) { + httpPort, err := findAvailablePort() + if err != nil { + return 0, 0, err + } for i := 0; i < 100; i++ { - httpPort, err := findAvailablePort() + grpcPort, err := findAvailablePort() if err != nil { return 0, 0, err } - grpcPort := httpPort + 10000 - - // check if grpc port is available - listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", grpcPort)) - if err == nil { - listener.Close() + if grpcPort != httpPort { return httpPort, grpcPort, nil } } @@ -188,14 +190,16 @@ func startMiniCluster(t *testing.T) (*TestCluster, error) { ctx, cancel := context.WithCancel(context.Background()) s3Endpoint := fmt.Sprintf("http://127.0.0.1:%d", s3Port) cluster := &TestCluster{ - dataDir: testDir, - ctx: ctx, - cancel: cancel, - masterPort: masterPort, - volumePort: volumePort, - filerPort: filerPort, - s3Port: s3Port, - s3Endpoint: s3Endpoint, + dataDir: testDir, + ctx: ctx, + cancel: cancel, + masterPort: masterPort, + masterGrpcPort: masterGrpcPort, + volumePort: volumePort, + filerPort: filerPort, + filerGrpcPort: filerGrpcPort, + s3Port: s3Port, + s3Endpoint: s3Endpoint, } // Disable authentication for tests diff --git a/test/s3tables/catalog/iceberg_catalog_test.go b/test/s3tables/catalog/iceberg_catalog_test.go index 592526a93..df2a0bcd1 100644 --- a/test/s3tables/catalog/iceberg_catalog_test.go +++ b/test/s3tables/catalog/iceberg_catalog_test.go @@ -275,7 +275,7 @@ func TestIcebergNamespaces(t *testing.T) { env.StartSeaweedFS(t) // Create the default table bucket first via S3 - createTableBucket(t, env, "default") + createTableBucket(t, env, "warehouse") // Test GET /v1/namespaces (should return empty list initially) resp, err := http.Get(env.IcebergURL() + "/v1/namespaces") diff --git a/test/s3tables/catalog_trino/trino_catalog_test.go b/test/s3tables/catalog_trino/trino_catalog_test.go new file mode 100644 index 000000000..e6bfac357 --- /dev/null +++ b/test/s3tables/catalog_trino/trino_catalog_test.go @@ -0,0 +1,536 @@ +package catalog_trino + +import ( + "context" + "crypto/rand" + "fmt" + "io" + "net" + "net/http" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +type TestEnvironment struct { + seaweedDir string + weedBinary string + dataDir string + bindIP string + s3Port int + s3GrpcPort int + icebergPort int + masterPort int + masterGrpcPort int + filerPort int + filerGrpcPort int + volumePort int + volumeGrpcPort int + weedProcess *exec.Cmd + weedCancel context.CancelFunc + trinoContainer string + dockerAvailable bool + accessKey string + secretKey string +} + +func TestTrinoIcebergCatalog(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + env := NewTestEnvironment(t) + defer env.Cleanup(t) + + if !env.dockerAvailable { + t.Skip("Docker not available, skipping Trino integration test") + } + + fmt.Printf(">>> Starting SeaweedFS...\n") + env.StartSeaweedFS(t) + fmt.Printf(">>> SeaweedFS started.\n") + + catalogBucket := "warehouse" + tableBucket := "iceberg-tables" + fmt.Printf(">>> Creating table bucket: %s\n", tableBucket) + createTableBucket(t, env, tableBucket) + fmt.Printf(">>> Creating table bucket: %s\n", catalogBucket) + createTableBucket(t, env, catalogBucket) + fmt.Printf(">>> All buckets created.\n") + + // Test Iceberg REST API directly + testIcebergRestAPI(t, env) + + configDir := env.writeTrinoConfig(t, catalogBucket) + env.startTrinoContainer(t, configDir) + waitForTrino(t, env.trinoContainer, 60*time.Second) + + schemaName := "trino_" + randomString(6) + + runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS iceberg.%s", schemaName)) + output := runTrinoSQL(t, env.trinoContainer, "SHOW SCHEMAS FROM iceberg") + if !strings.Contains(output, schemaName) { + t.Fatalf("Expected schema %s in output:\n%s", schemaName, output) + } + runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("SHOW TABLES FROM iceberg.%s", schemaName)) +} + +func NewTestEnvironment(t *testing.T) *TestEnvironment { + t.Helper() + + wd, err := os.Getwd() + if err != nil { + t.Fatalf("Failed to get working directory: %v", err) + } + + seaweedDir := wd + for i := 0; i < 6; i++ { + if _, err := os.Stat(filepath.Join(seaweedDir, "go.mod")); err == nil { + break + } + seaweedDir = filepath.Dir(seaweedDir) + } + + weedBinary := filepath.Join(seaweedDir, "weed", "weed") + if _, err := os.Stat(weedBinary); os.IsNotExist(err) { + weedBinary = "weed" + if _, err := exec.LookPath(weedBinary); err != nil { + t.Skip("weed binary not found, skipping integration test") + } + } + + dataDir, err := os.MkdirTemp("", "seaweed-trino-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + + bindIP := findBindIP() + + masterPort, masterGrpcPort := mustFreePortPair(t, "Master") + volumePort, volumeGrpcPort := mustFreePortPair(t, "Volume") + filerPort, filerGrpcPort := mustFreePortPair(t, "Filer") + s3Port, s3GrpcPort := mustFreePortPair(t, "S3") + icebergPort := mustFreePort(t, "Iceberg") + + return &TestEnvironment{ + seaweedDir: seaweedDir, + weedBinary: weedBinary, + dataDir: dataDir, + bindIP: bindIP, + s3Port: s3Port, + s3GrpcPort: s3GrpcPort, + icebergPort: icebergPort, + masterPort: masterPort, + masterGrpcPort: masterGrpcPort, + filerPort: filerPort, + filerGrpcPort: filerGrpcPort, + volumePort: volumePort, + volumeGrpcPort: volumeGrpcPort, + dockerAvailable: hasDocker(), + accessKey: "AKIAIOSFODNN7EXAMPLE", + secretKey: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + } +} + +func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { + t.Helper() + + // Create IAM config file + iamConfigPath := filepath.Join(env.dataDir, "iam_config.json") + iamConfig := fmt.Sprintf(`{ + "identities": [ + { + "name": "admin", + "credentials": [ + { + "accessKey": "%s", + "secretKey": "%s" + } + ], + "actions": [ + "Admin", + "Read", + "List", + "Tagging", + "Write" + ] + } + ] +}`, env.accessKey, env.secretKey) + + if err := os.WriteFile(iamConfigPath, []byte(iamConfig), 0644); err != nil { + t.Fatalf("Failed to create IAM config: %v", err) + } + + securityToml := filepath.Join(env.dataDir, "security.toml") + if err := os.WriteFile(securityToml, []byte("# Empty security config for testing\n"), 0644); err != nil { + t.Fatalf("Failed to create security.toml: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + env.weedCancel = cancel + + cmd := exec.CommandContext(ctx, env.weedBinary, "mini", + "-master.port", fmt.Sprintf("%d", env.masterPort), + "-master.port.grpc", fmt.Sprintf("%d", env.masterGrpcPort), + "-volume.port", fmt.Sprintf("%d", env.volumePort), + "-volume.port.grpc", fmt.Sprintf("%d", env.volumeGrpcPort), + "-filer.port", fmt.Sprintf("%d", env.filerPort), + "-filer.port.grpc", fmt.Sprintf("%d", env.filerGrpcPort), + "-s3.port", fmt.Sprintf("%d", env.s3Port), + "-s3.port.grpc", fmt.Sprintf("%d", env.s3GrpcPort), + "-s3.port.iceberg", fmt.Sprintf("%d", env.icebergPort), + "-s3.config", iamConfigPath, + "-ip", env.bindIP, + "-ip.bind", "0.0.0.0", + "-dir", env.dataDir, + ) + cmd.Dir = env.dataDir + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + // Set AWS credentials in environment (for compatibility) + cmd.Env = append(os.Environ(), + "AWS_ACCESS_KEY_ID="+env.accessKey, + "AWS_SECRET_ACCESS_KEY="+env.secretKey, + ) + + if err := cmd.Start(); err != nil { + t.Fatalf("Failed to start SeaweedFS: %v", err) + } + env.weedProcess = cmd + + // Try to check if Iceberg API is ready + // First try checking the /v1/config endpoint (requires auth, so will return 401 if server is up) + icebergURL := fmt.Sprintf("http://%s:%d/v1/config", env.bindIP, env.icebergPort) + if !env.waitForService(icebergURL, 30*time.Second) { + // Try to get more info about why it failed + client := &http.Client{Timeout: 2 * time.Second} + resp, err := client.Get(icebergURL) + if err != nil { + t.Logf("WARNING: Could not connect to Iceberg service at %s: %v", icebergURL, err) + } else { + t.Logf("WARNING: Iceberg service returned status %d at %s", resp.StatusCode, icebergURL) + resp.Body.Close() + } + t.Fatalf("Iceberg REST API did not become ready") + } +} + +func (env *TestEnvironment) Cleanup(t *testing.T) { + t.Helper() + + if env.trinoContainer != "" { + _ = exec.Command("docker", "rm", "-f", env.trinoContainer).Run() + } + + if env.weedCancel != nil { + env.weedCancel() + } + + if env.weedProcess != nil { + time.Sleep(2 * time.Second) + _ = env.weedProcess.Wait() + } + + if env.dataDir != "" { + _ = os.RemoveAll(env.dataDir) + } +} + +func (env *TestEnvironment) waitForService(url string, timeout time.Duration) bool { + client := &http.Client{Timeout: 2 * time.Second} + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + resp, err := client.Get(url) + if err != nil { + // Service not responding yet + time.Sleep(500 * time.Millisecond) + continue + } + statusCode := resp.StatusCode + resp.Body.Close() + // Accept 2xx status codes (successful responses) + if statusCode >= 200 && statusCode < 300 { + return true + } + // Also accept 401/403 (auth errors mean service is up, just needs credentials) + if statusCode == http.StatusUnauthorized || statusCode == http.StatusForbidden { + return true + } + // For other status codes, keep trying + time.Sleep(500 * time.Millisecond) + } + return false +} + +func testIcebergRestAPI(t *testing.T, env *TestEnvironment) { + t.Helper() + fmt.Printf(">>> Testing Iceberg REST API directly...\n") + + // First, verify the service is listening + conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", env.bindIP, env.icebergPort)) + if err != nil { + t.Fatalf("Cannot connect to Iceberg service at %s:%d: %v", env.bindIP, env.icebergPort, err) + } + conn.Close() + t.Logf("Successfully connected to Iceberg service at %s:%d", env.bindIP, env.icebergPort) + + // Test /v1/config endpoint + url := fmt.Sprintf("http://%s:%d/v1/config", env.bindIP, env.icebergPort) + t.Logf("Testing Iceberg REST API at %s", url) + + resp, err := http.Get(url) + if err != nil { + t.Fatalf("Failed to connect to Iceberg REST API at %s: %v", url, err) + } + defer resp.Body.Close() + + t.Logf("Iceberg REST API response status: %d", resp.StatusCode) + body, _ := io.ReadAll(resp.Body) + t.Logf("Iceberg REST API response body: %s", string(body)) + + if resp.StatusCode != http.StatusOK { + t.Fatalf("Expected 200 OK from /v1/config, got %d", resp.StatusCode) + } +} + +func (env *TestEnvironment) writeTrinoConfig(t *testing.T, warehouseBucket string) string { + t.Helper() + + configDir := filepath.Join(env.dataDir, "trino") + if err := os.MkdirAll(configDir, 0755); err != nil { + t.Fatalf("Failed to create Trino config dir: %v", err) + } + + config := fmt.Sprintf(`connector.name=iceberg +iceberg.catalog.type=rest +iceberg.rest-catalog.uri=http://host.docker.internal:%d +iceberg.rest-catalog.warehouse=s3://%s/ +iceberg.file-format=PARQUET + +# S3 storage config +fs.native-s3.enabled=true +s3.endpoint=http://host.docker.internal:%d +s3.path-style-access=true +s3.signer-type=AwsS3V4Signer +s3.aws-access-key=%s +s3.aws-secret-key=%s +s3.region=us-west-2 + +# REST catalog authentication +iceberg.rest-catalog.security=SIGV4 +`, env.icebergPort, warehouseBucket, env.s3Port, env.accessKey, env.secretKey) + + if err := os.WriteFile(filepath.Join(configDir, "iceberg.properties"), []byte(config), 0644); err != nil { + t.Fatalf("Failed to write Trino config: %v", err) + } + + return configDir +} + +func (env *TestEnvironment) startTrinoContainer(t *testing.T, configDir string) { + t.Helper() + + containerName := "seaweed-trino-" + randomString(8) + env.trinoContainer = containerName + + cmd := exec.Command("docker", "run", "-d", + "--name", containerName, + "--add-host", "host.docker.internal:host-gateway", + "-v", fmt.Sprintf("%s:/etc/trino/catalog", configDir), + "-v", fmt.Sprintf("%s:/test", env.dataDir), + "-e", "AWS_ACCESS_KEY_ID="+env.accessKey, + "-e", "AWS_SECRET_ACCESS_KEY="+env.secretKey, + "-e", "AWS_REGION=us-west-2", + "trinodb/trino:479", + ) + if output, err := cmd.CombinedOutput(); err != nil { + t.Fatalf("Failed to start Trino container: %v\n%s", err, string(output)) + } +} + +func waitForTrino(t *testing.T, containerName string, timeout time.Duration) { + t.Helper() + + deadline := time.Now().Add(timeout) + var lastOutput []byte + retryCount := 0 + for time.Now().Before(deadline) { + // Try system catalog query as a readiness check + cmd := exec.Command("docker", "exec", containerName, + "trino", "--catalog", "system", "--schema", "runtime", + "--execute", "SELECT 1", + ) + if output, err := cmd.CombinedOutput(); err == nil { + return + } else { + lastOutput = output + outputStr := string(output) + if strings.Contains(outputStr, "No such container") || + strings.Contains(outputStr, "is not running") { + break + } + retryCount++ + } + time.Sleep(1 * time.Second) + } + + // If we can't connect to system catalog, try to at least connect to Trino server + cmd := exec.Command("docker", "exec", containerName, "trino", "--version") + if err := cmd.Run(); err == nil { + // Trino process is running, even if catalog isn't ready yet + // Give it a bit more time + time.Sleep(5 * time.Second) + return + } + + logs, _ := exec.Command("docker", "logs", containerName).CombinedOutput() + t.Fatalf("Timed out waiting for Trino to be ready\nLast output:\n%s\nTrino logs:\n%s", string(lastOutput), string(logs)) +} + +func runTrinoSQL(t *testing.T, containerName, sql string) string { + t.Helper() + + cmd := exec.Command("docker", "exec", containerName, + "trino", "--catalog", "iceberg", + "--output-format", "CSV", + "--execute", sql, + ) + output, err := cmd.CombinedOutput() + if err != nil { + logs, _ := exec.Command("docker", "logs", containerName).CombinedOutput() + t.Fatalf("Trino command failed: %v\nSQL: %s\nOutput:\n%s\nTrino logs:\n%s", err, sql, string(output), string(logs)) + } + return string(output) +} + +func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) { + t.Helper() + + // Use weed shell to create the table bucket + // Create with "000000000000" account ID (matches AccountAdmin.Id from auth_credentials.go) + // This ensures bucket owner matches authenticated identity's Account.Id + cmd := exec.Command(env.weedBinary, "shell", + fmt.Sprintf("-master=%s:%d.%d", env.bindIP, env.masterPort, env.masterGrpcPort), + ) + cmd.Stdin = strings.NewReader(fmt.Sprintf("s3tables.bucket -create -name %s -account 000000000000\nexit\n", bucketName)) + fmt.Printf(">>> EXECUTING: %v\n", cmd.Args) + output, err := cmd.CombinedOutput() + if err != nil { + fmt.Printf(">>> ERROR Output: %s\n", string(output)) + t.Fatalf("Failed to create table bucket %s via weed shell: %v\nOutput: %s", bucketName, err, string(output)) + } + fmt.Printf(">>> SUCCESS: Created table bucket %s\n", bucketName) + + t.Logf("Created table bucket: %s", bucketName) +} + +func createObjectBucket(t *testing.T, env *TestEnvironment, bucketName string) { + t.Helper() + + // Create an AWS S3 client with the test credentials pointing to our local server + cfg := aws.Config{ + Region: "us-east-1", + Credentials: aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(env.accessKey, env.secretKey, "")), + BaseEndpoint: aws.String(fmt.Sprintf("http://%s:%d", env.bindIP, env.s3Port)), + } + + client := s3.NewFromConfig(cfg, func(o *s3.Options) { + o.UsePathStyle = true + }) + + // Create the bucket using standard S3 API + _, err := client.CreateBucket(context.Background(), &s3.CreateBucketInput{ + Bucket: aws.String(bucketName), + }) + if err != nil { + t.Fatalf("Failed to create object bucket %s: %v", bucketName, err) + } +} + +func hasDocker() bool { + cmd := exec.Command("docker", "version") + return cmd.Run() == nil +} + +func mustFreePort(t *testing.T, name string) int { + t.Helper() + + port, err := getFreePort() + if err != nil { + t.Fatalf("Failed to get free port for %s: %v", name, err) + } + return port +} + +func mustFreePortPair(t *testing.T, name string) (int, int) { + t.Helper() + + httpPort, grpcPort, err := findAvailablePortPair() + if err != nil { + t.Fatalf("Failed to get free port pair for %s: %v", name, err) + } + return httpPort, grpcPort +} + +func findAvailablePortPair() (int, int, error) { + httpPort, err := getFreePort() + if err != nil { + return 0, 0, err + } + grpcPort, err := getFreePort() + if err != nil { + return 0, 0, err + } + return httpPort, grpcPort, nil +} + +func getFreePort() (int, error) { + listener, err := net.Listen("tcp", "0.0.0.0:0") + if err != nil { + return 0, err + } + defer listener.Close() + + addr := listener.Addr().(*net.TCPAddr) + return addr.Port, nil +} + +func findBindIP() string { + addrs, err := net.InterfaceAddrs() + if err != nil { + return "127.0.0.1" + } + for _, addr := range addrs { + ipNet, ok := addr.(*net.IPNet) + if !ok || ipNet.IP == nil { + continue + } + ip := ipNet.IP.To4() + if ip == nil || ip.IsLoopback() || ip.IsLinkLocalUnicast() { + continue + } + return ip.String() + } + return "127.0.0.1" +} + +func randomString(length int) string { + const charset = "abcdefghijklmnopqrstuvwxyz0123456789" + b := make([]byte, length) + if _, err := rand.Read(b); err != nil { + panic("failed to generate random string: " + err.Error()) + } + for i := range b { + b[i] = charset[int(b[i])%len(charset)] + } + return string(b) +} diff --git a/weed/command/mini.go b/weed/command/mini.go index 956872de6..278b8db6e 100644 --- a/weed/command/mini.go +++ b/weed/command/mini.go @@ -963,8 +963,8 @@ func startMiniAdminWithWorker(allServicesReady chan struct{}) { // Determine bind IP for health checks bindIp := getBindIp() - // Prepare master address - masterAddr := fmt.Sprintf("%s:%d", *miniIp, *miniMasterOptions.port) + // Prepare master address with gRPC port + masterAddr := string(pb.NewServerAddress(*miniIp, *miniMasterOptions.port, *miniMasterOptions.portGrpc)) // Set admin options *miniAdminOptions.master = masterAddr diff --git a/weed/pb/server_address.go b/weed/pb/server_address.go index 943b85519..88cadbb81 100644 --- a/weed/pb/server_address.go +++ b/weed/pb/server_address.go @@ -15,7 +15,7 @@ type ServerAddresses string type ServerSrvAddress string func NewServerAddress(host string, port int, grpcPort int) ServerAddress { - if grpcPort == 0 || grpcPort == port+10000 { + if grpcPort == 0 { return ServerAddress(util.JoinHostPort(host, port)) } return ServerAddress(util.JoinHostPort(host, port) + "." + strconv.Itoa(grpcPort)) @@ -25,10 +25,6 @@ func NewServerAddressWithGrpcPort(address string, grpcPort int) ServerAddress { if grpcPort == 0 { return ServerAddress(address) } - _, port, _ := hostAndPort(address) - if uint64(grpcPort) == port+10000 { - return ServerAddress(address) - } return ServerAddress(address + "." + strconv.Itoa(grpcPort)) } diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index 0a36c39a8..d305f8b46 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -538,17 +538,45 @@ func (iam *IdentityAccessManagement) ReplaceS3ApiConfiguration(config *iam_pb.S3 } iam.m.Lock() + // Save existing environment-based identities before replacement + // This ensures AWS_ACCESS_KEY_ID credentials are preserved + envIdentities := make([]*Identity, 0) + for _, ident := range iam.identities { + if ident.IsStatic && strings.HasPrefix(ident.Name, "admin-") { + // This is an environment-based admin identity, preserve it + envIdentities = append(envIdentities, ident) + } + } + // atomically switch iam.identities = identities iam.identityAnonymous = identityAnonymous iam.accounts = accounts iam.emailAccount = emailAccount - iam.accessKeyIdent = accessKeyIdent iam.nameToIdentity = nameToIdentity + iam.accessKeyIdent = accessKeyIdent iam.policies = policies + + // Re-add environment-based identities that were preserved + for _, envIdent := range envIdentities { + // Check if this identity already exists in the new config + exists := false + for _, ident := range iam.identities { + if ident.Name == envIdent.Name { + exists = true + break + } + } + if !exists { + iam.identities = append(iam.identities, envIdent) + iam.accessKeyIdent[envIdent.Credentials[0].AccessKey] = envIdent + iam.nameToIdentity[envIdent.Name] = envIdent + } + } + // Update authentication state based on whether identities exist // Once enabled, keep it enabled (one-way toggle) - authJustEnabled := iam.updateAuthenticationState(len(identities)) + authJustEnabled := iam.updateAuthenticationState(len(iam.identities)) iam.m.Unlock() if authJustEnabled { @@ -778,9 +806,10 @@ func (iam *IdentityAccessManagement) MergeS3ApiConfiguration(config *iam_pb.S3Ap iam.identityAnonymous = identityAnonymous iam.accounts = accounts iam.emailAccount = emailAccount - iam.accessKeyIdent = accessKeyIdent iam.nameToIdentity = nameToIdentity + iam.accessKeyIdent = accessKeyIdent iam.policies = policies + iam.accessKeyIdent = accessKeyIdent // Update authentication state based on whether identities exist // Once enabled, keep it enabled (one-way toggle) authJustEnabled := iam.updateAuthenticationState(len(identities)) diff --git a/weed/s3api/auth_signature_v4.go b/weed/s3api/auth_signature_v4.go index e5960c76c..f3dc4c7c9 100644 --- a/weed/s3api/auth_signature_v4.go +++ b/weed/s3api/auth_signature_v4.go @@ -22,6 +22,7 @@ import ( "crypto/hmac" "crypto/sha256" "crypto/subtle" + "encoding/base64" "encoding/hex" "io" "net" @@ -104,6 +105,24 @@ func getContentSha256Cksum(r *http.Request) string { return emptySHA256 } +// normalizePayloadHash converts base64-encoded payload hash to hex format. +// AWS SigV4 canonical requests always use hex-encoded SHA256. +func normalizePayloadHash(payloadHashValue string) string { + // Special values and hex-encoded hashes don't need conversion + if payloadHashValue == emptySHA256 || payloadHashValue == unsignedPayload || + payloadHashValue == streamingContentSHA256 || payloadHashValue == streamingContentSHA256Trailer || + payloadHashValue == streamingUnsignedPayload || len(payloadHashValue) == 64 { + return payloadHashValue + } + + // Try to decode as base64 and convert to hex + if decodedBytes, err := base64.StdEncoding.DecodeString(payloadHashValue); err == nil && len(decodedBytes) == 32 { + return hex.EncodeToString(decodedBytes) + } + + return payloadHashValue +} + // signValues data type represents structured form of AWS Signature V4 header. type signValues struct { Credential credentialHeader @@ -485,6 +504,10 @@ func extractV4AuthInfoFromHeader(r *http.Request) (*v4AuthInfo, s3err.ErrorCode) } } + // Normalize payload hash to hex format for canonical request + // AWS SigV4 canonical requests always use hex-encoded SHA256 + normalizedPayload := normalizePayloadHash(hashedPayload) + return &v4AuthInfo{ Signature: signV4Values.Signature, AccessKey: signV4Values.Credential.accessKey, @@ -493,7 +516,7 @@ func extractV4AuthInfoFromHeader(r *http.Request) (*v4AuthInfo, s3err.ErrorCode) Region: signV4Values.Credential.scope.region, Service: signV4Values.Credential.scope.service, Scope: signV4Values.Credential.getScope(), - HashedPayload: hashedPayload, + HashedPayload: normalizedPayload, IsPresigned: false, }, s3err.ErrNone } diff --git a/weed/s3api/iceberg/iceberg.go b/weed/s3api/iceberg/iceberg.go index 8122b4627..f0e454a51 100644 --- a/weed/s3api/iceberg/iceberg.go +++ b/weed/s3api/iceberg/iceberg.go @@ -18,37 +18,11 @@ import ( "github.com/gorilla/mux" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" - "github.com/seaweedfs/seaweedfs/weed/s3api" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/s3api/s3tables" ) -func (s *Server) checkAuth(w http.ResponseWriter, r *http.Request, action s3api.Action, bucketName string) bool { - identityName := s3_constants.GetIdentityNameFromContext(r) - if identityName == "" { - writeError(w, http.StatusUnauthorized, "NotAuthorizedException", "Authentication required") - return false - } - - identityObj := s3_constants.GetIdentityFromContext(r) - if identityObj == nil { - writeError(w, http.StatusForbidden, "ForbiddenException", "Access denied: missing identity") - return false - } - identity, ok := identityObj.(*s3api.Identity) - if !ok { - writeError(w, http.StatusForbidden, "ForbiddenException", "Access denied: invalid identity") - return false - } - - if !identity.CanDo(action, bucketName, "") { - writeError(w, http.StatusForbidden, "ForbiddenException", "Access denied") - return false - } - return true -} - // FilerClient provides access to the filer for storage operations. type FilerClient interface { WithFilerClient(streamingMode bool, fn func(client filer_pb.SeaweedFilerClient) error) error @@ -79,17 +53,20 @@ func NewServer(filerClient FilerClient, authenticator S3Authenticator) *Server { // RegisterRoutes registers Iceberg REST API routes on the provided router. func (s *Server) RegisterRoutes(router *mux.Router) { - // Configuration endpoint - router.HandleFunc("/v1/config", s.Auth(s.handleConfig)).Methods(http.MethodGet) + // Add middleware to log all requests/responses + router.Use(loggingMiddleware) + + // Configuration endpoint - no auth needed for config + router.HandleFunc("/v1/config", s.handleConfig).Methods(http.MethodGet) - // Namespace endpoints + // Namespace endpoints - wrapped with Auth middleware router.HandleFunc("/v1/namespaces", s.Auth(s.handleListNamespaces)).Methods(http.MethodGet) router.HandleFunc("/v1/namespaces", s.Auth(s.handleCreateNamespace)).Methods(http.MethodPost) router.HandleFunc("/v1/namespaces/{namespace}", s.Auth(s.handleGetNamespace)).Methods(http.MethodGet) router.HandleFunc("/v1/namespaces/{namespace}", s.Auth(s.handleNamespaceExists)).Methods(http.MethodHead) router.HandleFunc("/v1/namespaces/{namespace}", s.Auth(s.handleDropNamespace)).Methods(http.MethodDelete) - // Table endpoints + // Table endpoints - wrapped with Auth middleware router.HandleFunc("/v1/namespaces/{namespace}/tables", s.Auth(s.handleListTables)).Methods(http.MethodGet) router.HandleFunc("/v1/namespaces/{namespace}/tables", s.Auth(s.handleCreateTable)).Methods(http.MethodPost) router.HandleFunc("/v1/namespaces/{namespace}/tables/{table}", s.Auth(s.handleLoadTable)).Methods(http.MethodGet) @@ -97,7 +74,7 @@ func (s *Server) RegisterRoutes(router *mux.Router) { router.HandleFunc("/v1/namespaces/{namespace}/tables/{table}", s.Auth(s.handleDropTable)).Methods(http.MethodDelete) router.HandleFunc("/v1/namespaces/{namespace}/tables/{table}", s.Auth(s.handleUpdateTable)).Methods(http.MethodPost) - // With prefix support + // With prefix support - wrapped with Auth middleware router.HandleFunc("/v1/{prefix}/namespaces", s.Auth(s.handleListNamespaces)).Methods(http.MethodGet) router.HandleFunc("/v1/{prefix}/namespaces", s.Auth(s.handleCreateNamespace)).Methods(http.MethodPost) router.HandleFunc("/v1/{prefix}/namespaces/{namespace}", s.Auth(s.handleGetNamespace)).Methods(http.MethodGet) @@ -110,7 +87,48 @@ func (s *Server) RegisterRoutes(router *mux.Router) { router.HandleFunc("/v1/{prefix}/namespaces/{namespace}/tables/{table}", s.Auth(s.handleDropTable)).Methods(http.MethodDelete) router.HandleFunc("/v1/{prefix}/namespaces/{namespace}/tables/{table}", s.Auth(s.handleUpdateTable)).Methods(http.MethodPost) - glog.V(0).Infof("Registered Iceberg REST Catalog routes") + // Catch-all for debugging + router.PathPrefix("/").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + glog.V(2).Infof("Catch-all route hit: %s %s", r.Method, r.RequestURI) + writeError(w, http.StatusNotFound, "NotFound", "Path not found") + }) + + glog.V(2).Infof("Registered Iceberg REST Catalog routes") +} + +func loggingMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + glog.V(2).Infof("Iceberg REST request: %s %s from %s", r.Method, r.RequestURI, r.RemoteAddr) + + // Log all headers for debugging + glog.V(2).Infof("Iceberg REST headers:") + for name, values := range r.Header { + for _, value := range values { + // Redact sensitive headers + if name == "Authorization" && len(value) > 20 { + glog.V(2).Infof(" %s: %s...%s", name, value[:20], value[len(value)-10:]) + } else { + glog.V(2).Infof(" %s: %s", name, value) + } + } + } + + // Create a response writer that captures the status code + wrapped := &responseWriter{ResponseWriter: w} + next.ServeHTTP(wrapped, r) + + glog.V(2).Infof("Iceberg REST response: %s %s -> %d", r.Method, r.RequestURI, wrapped.statusCode) + }) +} + +type responseWriter struct { + http.ResponseWriter + statusCode int +} + +func (w *responseWriter) WriteHeader(code int) { + w.statusCode = code + w.ResponseWriter.WriteHeader(code) } func (s *Server) Auth(handler http.HandlerFunc) http.HandlerFunc { @@ -293,8 +311,8 @@ func getBucketFromPrefix(r *http.Request) string { if prefix := vars["prefix"]; prefix != "" { return prefix } - // Default bucket if no prefix - return "default" + // Default bucket if no prefix - use "warehouse" for Iceberg + return "warehouse" } // buildTableBucketARN builds an ARN for a table bucket. @@ -305,25 +323,28 @@ func buildTableBucketARN(bucketName string) string { // handleConfig returns catalog configuration. func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) { - bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) { - return - } + glog.Infof("handleConfig: START") + glog.Infof("handleConfig: setting Content-Type header") + w.Header().Set("Content-Type", "application/json") config := CatalogConfig{ Defaults: map[string]string{}, Overrides: map[string]string{}, } - writeJSON(w, http.StatusOK, config) + glog.Infof("handleConfig: encoding JSON") + if err := json.NewEncoder(w).Encode(config); err != nil { + glog.Warningf("handleConfig: Failed to encode config: %v", err) + } + glog.Infof("handleConfig: COMPLETE") } // handleListNamespaces lists namespaces in a catalog. func (s *Server) handleListNamespaces(w http.ResponseWriter, r *http.Request) { bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_LIST, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + // Use S3 Tables manager to list namespaces var resp s3tables.ListNamespacesResponse req := &s3tables.ListNamespacesRequest{ @@ -333,11 +354,11 @@ func (s *Server) handleListNamespaces(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "ListNamespaces", req, &resp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "ListNamespaces", req, &resp, identityName) }) if err != nil { - glog.V(1).Infof("Iceberg: ListNamespaces error: %v", err) + glog.Infof("Iceberg: ListNamespaces error: %v", err) writeError(w, http.StatusInternalServerError, "InternalServerError", err.Error()) return } @@ -357,11 +378,11 @@ func (s *Server) handleListNamespaces(w http.ResponseWriter, r *http.Request) { // handleCreateNamespace creates a new namespace. func (s *Server) handleCreateNamespace(w http.ResponseWriter, r *http.Request) { bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_WRITE, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + var req CreateNamespaceRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid request body") @@ -382,15 +403,18 @@ func (s *Server) handleCreateNamespace(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "CreateNamespace", createReq, &createResp, "") + glog.Errorf("Iceberg: handleCreateNamespace calling Execute with identityName=%s", identityName) + return s.tablesManager.Execute(r.Context(), mgrClient, "CreateNamespace", createReq, &createResp, identityName) }) if err != nil { + glog.Errorf("Iceberg: handleCreateNamespace error: %v", err) + if strings.Contains(err.Error(), "already exists") { writeError(w, http.StatusConflict, "AlreadyExistsException", err.Error()) return } - glog.V(1).Infof("Iceberg: CreateNamespace error: %v", err) + glog.Infof("Iceberg: CreateNamespace error: %v", err) writeError(w, http.StatusInternalServerError, "InternalServerError", err.Error()) return } @@ -418,11 +442,11 @@ func (s *Server) handleGetNamespace(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + // Use S3 Tables manager to get namespace getReq := &s3tables.GetNamespaceRequest{ TableBucketARN: bucketARN, @@ -432,7 +456,7 @@ func (s *Server) handleGetNamespace(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "GetNamespace", getReq, &getResp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "GetNamespace", getReq, &getResp, identityName) }) if err != nil { @@ -462,11 +486,11 @@ func (s *Server) handleNamespaceExists(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + getReq := &s3tables.GetNamespaceRequest{ TableBucketARN: bucketARN, Namespace: namespace, @@ -475,7 +499,7 @@ func (s *Server) handleNamespaceExists(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "GetNamespace", getReq, &getResp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "GetNamespace", getReq, &getResp, identityName) }) if err != nil { @@ -500,11 +524,11 @@ func (s *Server) handleDropNamespace(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_DELETE_BUCKET, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + deleteReq := &s3tables.DeleteNamespaceRequest{ TableBucketARN: bucketARN, Namespace: namespace, @@ -512,10 +536,11 @@ func (s *Server) handleDropNamespace(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "DeleteNamespace", deleteReq, nil, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "DeleteNamespace", deleteReq, nil, identityName) }) if err != nil { + if strings.Contains(err.Error(), "not found") { writeError(w, http.StatusNotFound, "NoSuchNamespaceException", fmt.Sprintf("Namespace does not exist: %v", namespace)) return @@ -542,11 +567,11 @@ func (s *Server) handleListTables(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_LIST, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + listReq := &s3tables.ListTablesRequest{ TableBucketARN: bucketARN, Namespace: namespace, @@ -556,7 +581,7 @@ func (s *Server) handleListTables(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "ListTables", listReq, &listResp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "ListTables", listReq, &listResp, identityName) }) if err != nil { @@ -605,11 +630,11 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_WRITE, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + // Generate UUID for the new table tableUUID := uuid.New() location := fmt.Sprintf("s3://%s/%s/%s", bucketName, encodeNamespace(namespace), req.Name) @@ -657,7 +682,7 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) { err = s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "CreateTable", createReq, &createResp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "CreateTable", createReq, &createResp, identityName) }) if err != nil { @@ -696,11 +721,11 @@ func (s *Server) handleLoadTable(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + getReq := &s3tables.GetTableRequest{ TableBucketARN: bucketARN, Namespace: namespace, @@ -710,10 +735,11 @@ func (s *Server) handleLoadTable(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName) }) if err != nil { + if strings.Contains(err.Error(), "not found") { writeError(w, http.StatusNotFound, "NoSuchTableException", fmt.Sprintf("Table does not exist: %s", tableName)) return @@ -771,11 +797,11 @@ func (s *Server) handleTableExists(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + getReq := &s3tables.GetTableRequest{ TableBucketARN: bucketARN, Namespace: namespace, @@ -785,7 +811,7 @@ func (s *Server) handleTableExists(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName) }) if err != nil { @@ -807,11 +833,11 @@ func (s *Server) handleDropTable(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_DELETE_BUCKET, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + deleteReq := &s3tables.DeleteTableRequest{ TableBucketARN: bucketARN, Namespace: namespace, @@ -820,7 +846,7 @@ func (s *Server) handleDropTable(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "DeleteTable", deleteReq, nil, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "DeleteTable", deleteReq, nil, identityName) }) if err != nil { @@ -849,9 +875,10 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_WRITE, bucketName) { - return - } + bucketARN := buildTableBucketARN(bucketName) + + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) // Parse the commit request var req CommitTableRequest @@ -860,8 +887,6 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { return } - bucketARN := buildTableBucketARN(bucketName) - // First, load current table metadata getReq := &s3tables.GetTableRequest{ TableBucketARN: bucketARN, @@ -872,7 +897,7 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName) }) if err != nil { @@ -985,7 +1010,7 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { // 1. Write metadata file (this would normally be an S3 PutObject, // but s3tables manager handles the metadata storage logic) // For now, we assume s3tables.UpdateTable handles the reference update. - return s.tablesManager.Execute(r.Context(), mgrClient, "UpdateTable", updateReq, nil, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "UpdateTable", updateReq, nil, identityName) }) if err != nil { @@ -1002,14 +1027,6 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, result) } -// loadTableResultJSON is used for JSON serialization of LoadTableResult. -// It wraps table.Metadata (which is an interface) for proper JSON output. -type loadTableResultJSON struct { - MetadataLocation string `json:"metadata-location,omitempty"` - Metadata table.Metadata `json:"metadata"` - Config iceberg.Properties `json:"config,omitempty"` -} - // newTableMetadata creates a new table.Metadata object with the given parameters. // Uses iceberg-go's MetadataBuilder pattern for proper spec compliance. func newTableMetadata( diff --git a/weed/s3api/s3api_tables.go b/weed/s3api/s3api_tables.go index 54177c4d9..b93d5879f 100644 --- a/weed/s3api/s3api_tables.go +++ b/weed/s3api/s3api_tables.go @@ -632,6 +632,7 @@ func buildUntagResourceRequest(r *http.Request) (interface{}, error) { // which performs granular permission checks based on the specific operation. func (s3a *S3ApiServer) authenticateS3Tables(f http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + glog.V(2).Infof("S3Tables: authenticateS3Tables called, iam.isEnabled()=%t", s3a.iam.isEnabled()) if !s3a.iam.isEnabled() { f(w, r) return @@ -640,15 +641,19 @@ func (s3a *S3ApiServer) authenticateS3Tables(f http.HandlerFunc) http.HandlerFun // Use AuthSignatureOnly to authenticate the request without authorizing specific actions identity, errCode := s3a.iam.AuthSignatureOnly(r) if errCode != s3err.ErrNone { + glog.Errorf("S3Tables: AuthSignatureOnly failed: %v", errCode) s3err.WriteErrorResponse(w, r, errCode) return } // Store the authenticated identity in request context if identity != nil && identity.Name != "" { + glog.V(2).Infof("S3Tables: authenticated identity Name=%s Account.Id=%s", identity.Name, identity.Account.Id) ctx := s3_constants.SetIdentityNameInContext(r.Context(), identity.Name) ctx = s3_constants.SetIdentityInContext(ctx, identity) r = r.WithContext(ctx) + } else { + glog.V(2).Infof("S3Tables: authenticated identity is nil or empty name") } f(w, r) diff --git a/weed/s3api/s3tables/handler.go b/weed/s3api/s3tables/handler.go index dae11d562..39b10ce0e 100644 --- a/weed/s3api/s3tables/handler.go +++ b/weed/s3api/s3tables/handler.go @@ -164,9 +164,32 @@ func (h *S3TablesHandler) HandleRequest(w http.ResponseWriter, r *http.Request, // This is also used as the principal for permission checks, ensuring alignment between // the caller identity and ownership verification when IAM is enabled. func (h *S3TablesHandler) getAccountID(r *http.Request) string { + identityRaw := s3_constants.GetIdentityFromContext(r) + if identityRaw != nil { + // Use reflection to access the Account.Id field to avoid import cycle + val := reflect.ValueOf(identityRaw) + if val.Kind() == reflect.Ptr { + val = val.Elem() + } + if val.Kind() == reflect.Struct { + accountField := val.FieldByName("Account") + if accountField.IsValid() && !accountField.IsNil() { + accountVal := accountField.Elem() + if accountVal.Kind() == reflect.Struct { + idField := accountVal.FieldByName("Id") + if idField.IsValid() && idField.Kind() == reflect.String { + id := idField.String() + return id + } + } + } + } + } + if identityName := s3_constants.GetIdentityNameFromContext(r); identityName != "" { return identityName } + if accountID := r.Header.Get(s3_constants.AmzAccountId); accountID != "" { return accountID } diff --git a/weed/s3api/s3tables/handler_namespace.go b/weed/s3api/s3tables/handler_namespace.go index a732edc6e..492a53241 100644 --- a/weed/s3api/s3tables/handler_namespace.go +++ b/weed/s3api/s3tables/handler_namespace.go @@ -9,13 +9,16 @@ import ( "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" ) // handleCreateNamespace creates a new namespace in a table bucket func (h *S3TablesHandler) handleCreateNamespace(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + glog.Errorf("S3Tables: handleCreateNamespace called") var req CreateNamespaceRequest if err := h.readRequestBody(r, &req); err != nil { + glog.Errorf("S3Tables: handleCreateNamespace failed to read request body: %v", err) h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) return err } @@ -83,12 +86,14 @@ func (h *S3TablesHandler) handleCreateNamespace(w http.ResponseWriter, r *http.R bucketARN := h.generateTableBucketARN(bucketMetadata.OwnerAccountID, bucketName) principal := h.getAccountID(r) identityActions := getIdentityActions(r) + glog.Infof("S3Tables: CreateNamespace permission check - principal=%s, owner=%s, actions=%v", principal, bucketMetadata.OwnerAccountID, identityActions) if !CheckPermissionWithContext("CreateNamespace", principal, bucketMetadata.OwnerAccountID, bucketPolicy, bucketARN, &PolicyContext{ TableBucketName: bucketName, Namespace: namespaceName, TableBucketTags: bucketTags, IdentityActions: identityActions, }) { + glog.Infof("S3Tables: Permission denied for CreateNamespace - principal=%s, owner=%s", principal, bucketMetadata.OwnerAccountID) h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to create namespace in this bucket") return ErrAccessDenied } diff --git a/weed/s3api/s3tables/permissions.go b/weed/s3api/s3tables/permissions.go index af28c0a88..17d3b1a04 100644 --- a/weed/s3api/s3tables/permissions.go +++ b/weed/s3api/s3tables/permissions.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/s3api/policy_engine" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" ) @@ -110,6 +111,8 @@ func CheckPermissionWithContext(operation, principal, owner, resourcePolicy, res return true } + glog.V(2).Infof("S3Tables: CheckPermission operation=%s principal=%s owner=%s", operation, principal, owner) + return checkPermission(operation, principal, owner, resourcePolicy, resourceARN, ctx) }