From a3b83f880863dc8e5d35bd08784a0937b8c05537 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 6 Feb 2026 13:12:25 -0800 Subject: [PATCH] test: add Trino Iceberg catalog integration test (#8228) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * test: add Trino Iceberg catalog integration test - Create test/s3/catalog_trino/trino_catalog_test.go with TestTrinoIcebergCatalog - Tests integration between Trino SQL engine and SeaweedFS Iceberg REST catalog - Starts weed mini with all services and Trino in Docker container - Validates Iceberg catalog schema creation and listing operations - Uses native S3 filesystem support in Trino with path-style access - Add workflow job to s3-tables-tests.yml for CI execution * fix: preserve AWS environment credentials when replacing S3 configuration When S3 configuration is loaded from filer/db, it replaces the identities list and inadvertently removes AWS_ACCESS_KEY_ID credentials that were added from environment variables. This caused auth to remain disabled even though valid credentials were present. Fix by preserving environment-based identities when replacing the configuration and re-adding them after the replacement. This ensures environment credentials persist across configuration reloads and properly enable authentication. * fix: use correct ServerAddress format with gRPC port encoding The admin server couldn't connect to master because the master address was missing the gRPC port information. Use pb.NewServerAddress() which properly encodes both HTTP and gRPC ports in the address string. Changes: - weed/command/mini.go: Use pb.NewServerAddress for master address in admin - test/s3/policy/policy_test.go: Store and use gRPC ports for master/filer addresses This fix applies to: 1. Admin server connection to master (mini.go) 2. Test shell commands that need master/filer addresses (policy_test.go) * move * move * fix: always include gRPC port in server address encoding The NewServerAddress() function was omitting the gRPC port from the address string when it matched the port+10000 convention. However, gRPC port allocation doesn't always follow this convention - when the calculated port is busy, an alternative port is allocated. This caused a bug where: 1. Master's gRPC port was allocated as 50661 (sequential, not port+10000) 2. Address was encoded as '192.168.1.66:50660' (gRPC port omitted) 3. Admin client called ToGrpcAddress() which assumed port+10000 offset 4. Admin tried to connect to 60660 but master was on 50661 → connection failed Fix: Always include explicit gRPC port in address format (host:httpPort.grpcPort) unless gRPC port is 0. This makes addresses unambiguous and works regardless of the port allocation strategy used. Impacts: All server-to-server gRPC connections now use properly formatted addresses. * test: fix Iceberg REST API readiness check The Iceberg REST API endpoints require authentication. When checked without credentials, the API returns 403 Forbidden (not 401 Unauthorized). The readiness check now accepts both auth error codes (401/403) as indicators that the service is up and ready, it just needs credentials. This fixes the 'Iceberg REST API did not become ready' test failure. * Fix AWS SigV4 signature verification for base64-encoded payload hashes AWS SigV4 canonical requests must use hex-encoded SHA256 hashes, but the X-Amz-Content-Sha256 header may be transmitted as base64. Changes: - Added normalizePayloadHash() function to convert base64 to hex - Call normalizePayloadHash() in extractV4AuthInfoFromHeader() - Added encoding/base64 import Fixes 403 Forbidden errors on POST requests to Iceberg REST API when clients send base64-encoded content hashes in the header. Impacted services: Iceberg REST API, S3Tables * Fix AWS SigV4 signature verification for base64-encoded payload hashes AWS SigV4 canonical requests must use hex-encoded SHA256 hashes, but the X-Amz-Content-Sha256 header may be transmitted as base64. Changes: - Added normalizePayloadHash() function to convert base64 to hex - Call normalizePayloadHash() in extractV4AuthInfoFromHeader() - Added encoding/base64 import - Removed unused fmt import Fixes 403 Forbidden errors on POST requests to Iceberg REST API when clients send base64-encoded content hashes in the header. Impacted services: Iceberg REST API, S3Tables * pass sigv4 * s3api: fix identity preservation and logging levels - Ensure environment-based identities are preserved during config replacement - Update accessKeyIdent and nameToIdentity maps correctly - Downgrade informational logs to V(2) to reduce noise * test: fix trino integration test and s3 policy test - Pin Trino image version to 479 - Fix port binding to 0.0.0.0 for Docker connectivity - Fix S3 policy test hang by correctly assigning MiniClusterCtx - Improve port finding robustness in policy tests * ci: pre-pull trino image to avoid timeouts - Pull trinodb/trino:479 after Docker setup - Ensure image is ready before integration tests start * iceberg: remove unused checkAuth and improve logging - Remove unused checkAuth method - Downgrade informational logs to V(2) - Ensure loggingMiddleware uses a status writer for accurate reported codes - Narrow catch-all route to avoid interfering with other subsystems * iceberg: fix build failure by removing unused s3api import * Update iceberg.go * use warehouse * Update trino_catalog_test.go --- .github/workflows/s3-tables-tests.yml | 63 ++ test/s3/policy/policy_test.go | 58 +- test/s3tables/catalog/iceberg_catalog_test.go | 2 +- .../catalog_trino/trino_catalog_test.go | 536 ++++++++++++++++++ weed/command/mini.go | 4 +- weed/pb/server_address.go | 6 +- weed/s3api/auth_credentials.go | 35 +- weed/s3api/auth_signature_v4.go | 25 +- weed/s3api/iceberg/iceberg.go | 209 +++---- weed/s3api/s3api_tables.go | 5 + weed/s3api/s3tables/handler.go | 23 + weed/s3api/s3tables/handler_namespace.go | 5 + weed/s3api/s3tables/permissions.go | 3 + 13 files changed, 839 insertions(+), 135 deletions(-) create mode 100644 test/s3tables/catalog_trino/trino_catalog_test.go diff --git a/.github/workflows/s3-tables-tests.yml b/.github/workflows/s3-tables-tests.yml index 8b445b85b..1b1d0763d 100644 --- a/.github/workflows/s3-tables-tests.yml +++ b/.github/workflows/s3-tables-tests.yml @@ -122,6 +122,69 @@ jobs: path: test/s3tables/catalog/test-output.log retention-days: 3 + trino-iceberg-catalog-tests: + name: Trino Iceberg Catalog Integration Tests + runs-on: ubuntu-22.04 + timeout-minutes: 30 + + steps: + - name: Check out code + uses: actions/checkout@v6 + + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' + id: go + + - name: Set up Docker + uses: docker/setup-buildx-action@v3 + + - name: Pre-pull Trino image + run: docker pull trinodb/trino:479 + + - name: Install SeaweedFS + run: | + go install -buildvcs=false ./weed + + - name: Run Trino Iceberg Catalog Integration Tests + timeout-minutes: 25 + working-directory: test/s3tables/catalog_trino + run: | + set -x + set -o pipefail + echo "=== System Information ===" + uname -a + free -h + df -h + echo "=== Starting Trino Iceberg Catalog Tests ===" + + # Run Trino + Iceberg catalog integration tests + go test -v -timeout 20m . 2>&1 | tee test-output.log || { + echo "Trino Iceberg catalog integration tests failed" + exit 1 + } + + - name: Show test output on failure + if: failure() + working-directory: test/s3tables/catalog_trino + run: | + echo "=== Test Output ===" + if [ -f test-output.log ]; then + tail -200 test-output.log + fi + + echo "=== Process information ===" + ps aux | grep -E "(weed|test|docker)" || true + + - name: Upload test logs on failure + if: failure() + uses: actions/upload-artifact@v6 + with: + name: trino-iceberg-catalog-test-logs + path: test/s3tables/catalog_trino/test-output.log + retention-days: 3 + s3-tables-build-verification: name: S3 Tables Build Verification runs-on: ubuntu-22.04 diff --git a/test/s3/policy/policy_test.go b/test/s3/policy/policy_test.go index 2b181bf89..998bbabdc 100644 --- a/test/s3/policy/policy_test.go +++ b/test/s3/policy/policy_test.go @@ -16,22 +16,25 @@ import ( "github.com/seaweedfs/seaweedfs/weed/command" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" flag "github.com/seaweedfs/seaweedfs/weed/util/fla9" "github.com/stretchr/testify/require" ) // TestCluster manages the weed mini instance for integration testing type TestCluster struct { - dataDir string - ctx context.Context - cancel context.CancelFunc - isRunning bool - wg sync.WaitGroup - masterPort int - volumePort int - filerPort int - s3Port int - s3Endpoint string + dataDir string + ctx context.Context + cancel context.CancelFunc + isRunning bool + wg sync.WaitGroup + masterPort int + masterGrpcPort int + volumePort int + filerPort int + filerGrpcPort int + s3Port int + s3Endpoint string } func TestS3PolicyShellRevised(t *testing.T) { @@ -53,8 +56,8 @@ func TestS3PolicyShellRevised(t *testing.T) { require.NoError(t, tmpPolicyFile.Close()) weedCmd := "weed" - masterAddr := fmt.Sprintf("127.0.0.1:%d", cluster.masterPort) - filerAddr := fmt.Sprintf("127.0.0.1:%d", cluster.filerPort) + masterAddr := string(pb.NewServerAddress("127.0.0.1", cluster.masterPort, cluster.masterGrpcPort)) + filerAddr := string(pb.NewServerAddress("127.0.0.1", cluster.filerPort, cluster.filerGrpcPort)) // Put execShell(t, weedCmd, masterAddr, filerAddr, fmt.Sprintf("s3.policy -put -name=testpolicy -file=%s", tmpPolicyFile.Name())) @@ -156,17 +159,16 @@ func findAvailablePort() (int, error) { // findAvailablePortPair finds an available http port P such that P and P+10000 (grpc) are both available func findAvailablePortPair() (int, int, error) { + httpPort, err := findAvailablePort() + if err != nil { + return 0, 0, err + } for i := 0; i < 100; i++ { - httpPort, err := findAvailablePort() + grpcPort, err := findAvailablePort() if err != nil { return 0, 0, err } - grpcPort := httpPort + 10000 - - // check if grpc port is available - listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", grpcPort)) - if err == nil { - listener.Close() + if grpcPort != httpPort { return httpPort, grpcPort, nil } } @@ -188,14 +190,16 @@ func startMiniCluster(t *testing.T) (*TestCluster, error) { ctx, cancel := context.WithCancel(context.Background()) s3Endpoint := fmt.Sprintf("http://127.0.0.1:%d", s3Port) cluster := &TestCluster{ - dataDir: testDir, - ctx: ctx, - cancel: cancel, - masterPort: masterPort, - volumePort: volumePort, - filerPort: filerPort, - s3Port: s3Port, - s3Endpoint: s3Endpoint, + dataDir: testDir, + ctx: ctx, + cancel: cancel, + masterPort: masterPort, + masterGrpcPort: masterGrpcPort, + volumePort: volumePort, + filerPort: filerPort, + filerGrpcPort: filerGrpcPort, + s3Port: s3Port, + s3Endpoint: s3Endpoint, } // Disable authentication for tests diff --git a/test/s3tables/catalog/iceberg_catalog_test.go b/test/s3tables/catalog/iceberg_catalog_test.go index 592526a93..df2a0bcd1 100644 --- a/test/s3tables/catalog/iceberg_catalog_test.go +++ b/test/s3tables/catalog/iceberg_catalog_test.go @@ -275,7 +275,7 @@ func TestIcebergNamespaces(t *testing.T) { env.StartSeaweedFS(t) // Create the default table bucket first via S3 - createTableBucket(t, env, "default") + createTableBucket(t, env, "warehouse") // Test GET /v1/namespaces (should return empty list initially) resp, err := http.Get(env.IcebergURL() + "/v1/namespaces") diff --git a/test/s3tables/catalog_trino/trino_catalog_test.go b/test/s3tables/catalog_trino/trino_catalog_test.go new file mode 100644 index 000000000..e6bfac357 --- /dev/null +++ b/test/s3tables/catalog_trino/trino_catalog_test.go @@ -0,0 +1,536 @@ +package catalog_trino + +import ( + "context" + "crypto/rand" + "fmt" + "io" + "net" + "net/http" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +type TestEnvironment struct { + seaweedDir string + weedBinary string + dataDir string + bindIP string + s3Port int + s3GrpcPort int + icebergPort int + masterPort int + masterGrpcPort int + filerPort int + filerGrpcPort int + volumePort int + volumeGrpcPort int + weedProcess *exec.Cmd + weedCancel context.CancelFunc + trinoContainer string + dockerAvailable bool + accessKey string + secretKey string +} + +func TestTrinoIcebergCatalog(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + env := NewTestEnvironment(t) + defer env.Cleanup(t) + + if !env.dockerAvailable { + t.Skip("Docker not available, skipping Trino integration test") + } + + fmt.Printf(">>> Starting SeaweedFS...\n") + env.StartSeaweedFS(t) + fmt.Printf(">>> SeaweedFS started.\n") + + catalogBucket := "warehouse" + tableBucket := "iceberg-tables" + fmt.Printf(">>> Creating table bucket: %s\n", tableBucket) + createTableBucket(t, env, tableBucket) + fmt.Printf(">>> Creating table bucket: %s\n", catalogBucket) + createTableBucket(t, env, catalogBucket) + fmt.Printf(">>> All buckets created.\n") + + // Test Iceberg REST API directly + testIcebergRestAPI(t, env) + + configDir := env.writeTrinoConfig(t, catalogBucket) + env.startTrinoContainer(t, configDir) + waitForTrino(t, env.trinoContainer, 60*time.Second) + + schemaName := "trino_" + randomString(6) + + runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS iceberg.%s", schemaName)) + output := runTrinoSQL(t, env.trinoContainer, "SHOW SCHEMAS FROM iceberg") + if !strings.Contains(output, schemaName) { + t.Fatalf("Expected schema %s in output:\n%s", schemaName, output) + } + runTrinoSQL(t, env.trinoContainer, fmt.Sprintf("SHOW TABLES FROM iceberg.%s", schemaName)) +} + +func NewTestEnvironment(t *testing.T) *TestEnvironment { + t.Helper() + + wd, err := os.Getwd() + if err != nil { + t.Fatalf("Failed to get working directory: %v", err) + } + + seaweedDir := wd + for i := 0; i < 6; i++ { + if _, err := os.Stat(filepath.Join(seaweedDir, "go.mod")); err == nil { + break + } + seaweedDir = filepath.Dir(seaweedDir) + } + + weedBinary := filepath.Join(seaweedDir, "weed", "weed") + if _, err := os.Stat(weedBinary); os.IsNotExist(err) { + weedBinary = "weed" + if _, err := exec.LookPath(weedBinary); err != nil { + t.Skip("weed binary not found, skipping integration test") + } + } + + dataDir, err := os.MkdirTemp("", "seaweed-trino-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + + bindIP := findBindIP() + + masterPort, masterGrpcPort := mustFreePortPair(t, "Master") + volumePort, volumeGrpcPort := mustFreePortPair(t, "Volume") + filerPort, filerGrpcPort := mustFreePortPair(t, "Filer") + s3Port, s3GrpcPort := mustFreePortPair(t, "S3") + icebergPort := mustFreePort(t, "Iceberg") + + return &TestEnvironment{ + seaweedDir: seaweedDir, + weedBinary: weedBinary, + dataDir: dataDir, + bindIP: bindIP, + s3Port: s3Port, + s3GrpcPort: s3GrpcPort, + icebergPort: icebergPort, + masterPort: masterPort, + masterGrpcPort: masterGrpcPort, + filerPort: filerPort, + filerGrpcPort: filerGrpcPort, + volumePort: volumePort, + volumeGrpcPort: volumeGrpcPort, + dockerAvailable: hasDocker(), + accessKey: "AKIAIOSFODNN7EXAMPLE", + secretKey: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + } +} + +func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { + t.Helper() + + // Create IAM config file + iamConfigPath := filepath.Join(env.dataDir, "iam_config.json") + iamConfig := fmt.Sprintf(`{ + "identities": [ + { + "name": "admin", + "credentials": [ + { + "accessKey": "%s", + "secretKey": "%s" + } + ], + "actions": [ + "Admin", + "Read", + "List", + "Tagging", + "Write" + ] + } + ] +}`, env.accessKey, env.secretKey) + + if err := os.WriteFile(iamConfigPath, []byte(iamConfig), 0644); err != nil { + t.Fatalf("Failed to create IAM config: %v", err) + } + + securityToml := filepath.Join(env.dataDir, "security.toml") + if err := os.WriteFile(securityToml, []byte("# Empty security config for testing\n"), 0644); err != nil { + t.Fatalf("Failed to create security.toml: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + env.weedCancel = cancel + + cmd := exec.CommandContext(ctx, env.weedBinary, "mini", + "-master.port", fmt.Sprintf("%d", env.masterPort), + "-master.port.grpc", fmt.Sprintf("%d", env.masterGrpcPort), + "-volume.port", fmt.Sprintf("%d", env.volumePort), + "-volume.port.grpc", fmt.Sprintf("%d", env.volumeGrpcPort), + "-filer.port", fmt.Sprintf("%d", env.filerPort), + "-filer.port.grpc", fmt.Sprintf("%d", env.filerGrpcPort), + "-s3.port", fmt.Sprintf("%d", env.s3Port), + "-s3.port.grpc", fmt.Sprintf("%d", env.s3GrpcPort), + "-s3.port.iceberg", fmt.Sprintf("%d", env.icebergPort), + "-s3.config", iamConfigPath, + "-ip", env.bindIP, + "-ip.bind", "0.0.0.0", + "-dir", env.dataDir, + ) + cmd.Dir = env.dataDir + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + // Set AWS credentials in environment (for compatibility) + cmd.Env = append(os.Environ(), + "AWS_ACCESS_KEY_ID="+env.accessKey, + "AWS_SECRET_ACCESS_KEY="+env.secretKey, + ) + + if err := cmd.Start(); err != nil { + t.Fatalf("Failed to start SeaweedFS: %v", err) + } + env.weedProcess = cmd + + // Try to check if Iceberg API is ready + // First try checking the /v1/config endpoint (requires auth, so will return 401 if server is up) + icebergURL := fmt.Sprintf("http://%s:%d/v1/config", env.bindIP, env.icebergPort) + if !env.waitForService(icebergURL, 30*time.Second) { + // Try to get more info about why it failed + client := &http.Client{Timeout: 2 * time.Second} + resp, err := client.Get(icebergURL) + if err != nil { + t.Logf("WARNING: Could not connect to Iceberg service at %s: %v", icebergURL, err) + } else { + t.Logf("WARNING: Iceberg service returned status %d at %s", resp.StatusCode, icebergURL) + resp.Body.Close() + } + t.Fatalf("Iceberg REST API did not become ready") + } +} + +func (env *TestEnvironment) Cleanup(t *testing.T) { + t.Helper() + + if env.trinoContainer != "" { + _ = exec.Command("docker", "rm", "-f", env.trinoContainer).Run() + } + + if env.weedCancel != nil { + env.weedCancel() + } + + if env.weedProcess != nil { + time.Sleep(2 * time.Second) + _ = env.weedProcess.Wait() + } + + if env.dataDir != "" { + _ = os.RemoveAll(env.dataDir) + } +} + +func (env *TestEnvironment) waitForService(url string, timeout time.Duration) bool { + client := &http.Client{Timeout: 2 * time.Second} + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + resp, err := client.Get(url) + if err != nil { + // Service not responding yet + time.Sleep(500 * time.Millisecond) + continue + } + statusCode := resp.StatusCode + resp.Body.Close() + // Accept 2xx status codes (successful responses) + if statusCode >= 200 && statusCode < 300 { + return true + } + // Also accept 401/403 (auth errors mean service is up, just needs credentials) + if statusCode == http.StatusUnauthorized || statusCode == http.StatusForbidden { + return true + } + // For other status codes, keep trying + time.Sleep(500 * time.Millisecond) + } + return false +} + +func testIcebergRestAPI(t *testing.T, env *TestEnvironment) { + t.Helper() + fmt.Printf(">>> Testing Iceberg REST API directly...\n") + + // First, verify the service is listening + conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", env.bindIP, env.icebergPort)) + if err != nil { + t.Fatalf("Cannot connect to Iceberg service at %s:%d: %v", env.bindIP, env.icebergPort, err) + } + conn.Close() + t.Logf("Successfully connected to Iceberg service at %s:%d", env.bindIP, env.icebergPort) + + // Test /v1/config endpoint + url := fmt.Sprintf("http://%s:%d/v1/config", env.bindIP, env.icebergPort) + t.Logf("Testing Iceberg REST API at %s", url) + + resp, err := http.Get(url) + if err != nil { + t.Fatalf("Failed to connect to Iceberg REST API at %s: %v", url, err) + } + defer resp.Body.Close() + + t.Logf("Iceberg REST API response status: %d", resp.StatusCode) + body, _ := io.ReadAll(resp.Body) + t.Logf("Iceberg REST API response body: %s", string(body)) + + if resp.StatusCode != http.StatusOK { + t.Fatalf("Expected 200 OK from /v1/config, got %d", resp.StatusCode) + } +} + +func (env *TestEnvironment) writeTrinoConfig(t *testing.T, warehouseBucket string) string { + t.Helper() + + configDir := filepath.Join(env.dataDir, "trino") + if err := os.MkdirAll(configDir, 0755); err != nil { + t.Fatalf("Failed to create Trino config dir: %v", err) + } + + config := fmt.Sprintf(`connector.name=iceberg +iceberg.catalog.type=rest +iceberg.rest-catalog.uri=http://host.docker.internal:%d +iceberg.rest-catalog.warehouse=s3://%s/ +iceberg.file-format=PARQUET + +# S3 storage config +fs.native-s3.enabled=true +s3.endpoint=http://host.docker.internal:%d +s3.path-style-access=true +s3.signer-type=AwsS3V4Signer +s3.aws-access-key=%s +s3.aws-secret-key=%s +s3.region=us-west-2 + +# REST catalog authentication +iceberg.rest-catalog.security=SIGV4 +`, env.icebergPort, warehouseBucket, env.s3Port, env.accessKey, env.secretKey) + + if err := os.WriteFile(filepath.Join(configDir, "iceberg.properties"), []byte(config), 0644); err != nil { + t.Fatalf("Failed to write Trino config: %v", err) + } + + return configDir +} + +func (env *TestEnvironment) startTrinoContainer(t *testing.T, configDir string) { + t.Helper() + + containerName := "seaweed-trino-" + randomString(8) + env.trinoContainer = containerName + + cmd := exec.Command("docker", "run", "-d", + "--name", containerName, + "--add-host", "host.docker.internal:host-gateway", + "-v", fmt.Sprintf("%s:/etc/trino/catalog", configDir), + "-v", fmt.Sprintf("%s:/test", env.dataDir), + "-e", "AWS_ACCESS_KEY_ID="+env.accessKey, + "-e", "AWS_SECRET_ACCESS_KEY="+env.secretKey, + "-e", "AWS_REGION=us-west-2", + "trinodb/trino:479", + ) + if output, err := cmd.CombinedOutput(); err != nil { + t.Fatalf("Failed to start Trino container: %v\n%s", err, string(output)) + } +} + +func waitForTrino(t *testing.T, containerName string, timeout time.Duration) { + t.Helper() + + deadline := time.Now().Add(timeout) + var lastOutput []byte + retryCount := 0 + for time.Now().Before(deadline) { + // Try system catalog query as a readiness check + cmd := exec.Command("docker", "exec", containerName, + "trino", "--catalog", "system", "--schema", "runtime", + "--execute", "SELECT 1", + ) + if output, err := cmd.CombinedOutput(); err == nil { + return + } else { + lastOutput = output + outputStr := string(output) + if strings.Contains(outputStr, "No such container") || + strings.Contains(outputStr, "is not running") { + break + } + retryCount++ + } + time.Sleep(1 * time.Second) + } + + // If we can't connect to system catalog, try to at least connect to Trino server + cmd := exec.Command("docker", "exec", containerName, "trino", "--version") + if err := cmd.Run(); err == nil { + // Trino process is running, even if catalog isn't ready yet + // Give it a bit more time + time.Sleep(5 * time.Second) + return + } + + logs, _ := exec.Command("docker", "logs", containerName).CombinedOutput() + t.Fatalf("Timed out waiting for Trino to be ready\nLast output:\n%s\nTrino logs:\n%s", string(lastOutput), string(logs)) +} + +func runTrinoSQL(t *testing.T, containerName, sql string) string { + t.Helper() + + cmd := exec.Command("docker", "exec", containerName, + "trino", "--catalog", "iceberg", + "--output-format", "CSV", + "--execute", sql, + ) + output, err := cmd.CombinedOutput() + if err != nil { + logs, _ := exec.Command("docker", "logs", containerName).CombinedOutput() + t.Fatalf("Trino command failed: %v\nSQL: %s\nOutput:\n%s\nTrino logs:\n%s", err, sql, string(output), string(logs)) + } + return string(output) +} + +func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) { + t.Helper() + + // Use weed shell to create the table bucket + // Create with "000000000000" account ID (matches AccountAdmin.Id from auth_credentials.go) + // This ensures bucket owner matches authenticated identity's Account.Id + cmd := exec.Command(env.weedBinary, "shell", + fmt.Sprintf("-master=%s:%d.%d", env.bindIP, env.masterPort, env.masterGrpcPort), + ) + cmd.Stdin = strings.NewReader(fmt.Sprintf("s3tables.bucket -create -name %s -account 000000000000\nexit\n", bucketName)) + fmt.Printf(">>> EXECUTING: %v\n", cmd.Args) + output, err := cmd.CombinedOutput() + if err != nil { + fmt.Printf(">>> ERROR Output: %s\n", string(output)) + t.Fatalf("Failed to create table bucket %s via weed shell: %v\nOutput: %s", bucketName, err, string(output)) + } + fmt.Printf(">>> SUCCESS: Created table bucket %s\n", bucketName) + + t.Logf("Created table bucket: %s", bucketName) +} + +func createObjectBucket(t *testing.T, env *TestEnvironment, bucketName string) { + t.Helper() + + // Create an AWS S3 client with the test credentials pointing to our local server + cfg := aws.Config{ + Region: "us-east-1", + Credentials: aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(env.accessKey, env.secretKey, "")), + BaseEndpoint: aws.String(fmt.Sprintf("http://%s:%d", env.bindIP, env.s3Port)), + } + + client := s3.NewFromConfig(cfg, func(o *s3.Options) { + o.UsePathStyle = true + }) + + // Create the bucket using standard S3 API + _, err := client.CreateBucket(context.Background(), &s3.CreateBucketInput{ + Bucket: aws.String(bucketName), + }) + if err != nil { + t.Fatalf("Failed to create object bucket %s: %v", bucketName, err) + } +} + +func hasDocker() bool { + cmd := exec.Command("docker", "version") + return cmd.Run() == nil +} + +func mustFreePort(t *testing.T, name string) int { + t.Helper() + + port, err := getFreePort() + if err != nil { + t.Fatalf("Failed to get free port for %s: %v", name, err) + } + return port +} + +func mustFreePortPair(t *testing.T, name string) (int, int) { + t.Helper() + + httpPort, grpcPort, err := findAvailablePortPair() + if err != nil { + t.Fatalf("Failed to get free port pair for %s: %v", name, err) + } + return httpPort, grpcPort +} + +func findAvailablePortPair() (int, int, error) { + httpPort, err := getFreePort() + if err != nil { + return 0, 0, err + } + grpcPort, err := getFreePort() + if err != nil { + return 0, 0, err + } + return httpPort, grpcPort, nil +} + +func getFreePort() (int, error) { + listener, err := net.Listen("tcp", "0.0.0.0:0") + if err != nil { + return 0, err + } + defer listener.Close() + + addr := listener.Addr().(*net.TCPAddr) + return addr.Port, nil +} + +func findBindIP() string { + addrs, err := net.InterfaceAddrs() + if err != nil { + return "127.0.0.1" + } + for _, addr := range addrs { + ipNet, ok := addr.(*net.IPNet) + if !ok || ipNet.IP == nil { + continue + } + ip := ipNet.IP.To4() + if ip == nil || ip.IsLoopback() || ip.IsLinkLocalUnicast() { + continue + } + return ip.String() + } + return "127.0.0.1" +} + +func randomString(length int) string { + const charset = "abcdefghijklmnopqrstuvwxyz0123456789" + b := make([]byte, length) + if _, err := rand.Read(b); err != nil { + panic("failed to generate random string: " + err.Error()) + } + for i := range b { + b[i] = charset[int(b[i])%len(charset)] + } + return string(b) +} diff --git a/weed/command/mini.go b/weed/command/mini.go index 956872de6..278b8db6e 100644 --- a/weed/command/mini.go +++ b/weed/command/mini.go @@ -963,8 +963,8 @@ func startMiniAdminWithWorker(allServicesReady chan struct{}) { // Determine bind IP for health checks bindIp := getBindIp() - // Prepare master address - masterAddr := fmt.Sprintf("%s:%d", *miniIp, *miniMasterOptions.port) + // Prepare master address with gRPC port + masterAddr := string(pb.NewServerAddress(*miniIp, *miniMasterOptions.port, *miniMasterOptions.portGrpc)) // Set admin options *miniAdminOptions.master = masterAddr diff --git a/weed/pb/server_address.go b/weed/pb/server_address.go index 943b85519..88cadbb81 100644 --- a/weed/pb/server_address.go +++ b/weed/pb/server_address.go @@ -15,7 +15,7 @@ type ServerAddresses string type ServerSrvAddress string func NewServerAddress(host string, port int, grpcPort int) ServerAddress { - if grpcPort == 0 || grpcPort == port+10000 { + if grpcPort == 0 { return ServerAddress(util.JoinHostPort(host, port)) } return ServerAddress(util.JoinHostPort(host, port) + "." + strconv.Itoa(grpcPort)) @@ -25,10 +25,6 @@ func NewServerAddressWithGrpcPort(address string, grpcPort int) ServerAddress { if grpcPort == 0 { return ServerAddress(address) } - _, port, _ := hostAndPort(address) - if uint64(grpcPort) == port+10000 { - return ServerAddress(address) - } return ServerAddress(address + "." + strconv.Itoa(grpcPort)) } diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index 0a36c39a8..d305f8b46 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -538,17 +538,45 @@ func (iam *IdentityAccessManagement) ReplaceS3ApiConfiguration(config *iam_pb.S3 } iam.m.Lock() + // Save existing environment-based identities before replacement + // This ensures AWS_ACCESS_KEY_ID credentials are preserved + envIdentities := make([]*Identity, 0) + for _, ident := range iam.identities { + if ident.IsStatic && strings.HasPrefix(ident.Name, "admin-") { + // This is an environment-based admin identity, preserve it + envIdentities = append(envIdentities, ident) + } + } + // atomically switch iam.identities = identities iam.identityAnonymous = identityAnonymous iam.accounts = accounts iam.emailAccount = emailAccount - iam.accessKeyIdent = accessKeyIdent iam.nameToIdentity = nameToIdentity + iam.accessKeyIdent = accessKeyIdent iam.policies = policies + + // Re-add environment-based identities that were preserved + for _, envIdent := range envIdentities { + // Check if this identity already exists in the new config + exists := false + for _, ident := range iam.identities { + if ident.Name == envIdent.Name { + exists = true + break + } + } + if !exists { + iam.identities = append(iam.identities, envIdent) + iam.accessKeyIdent[envIdent.Credentials[0].AccessKey] = envIdent + iam.nameToIdentity[envIdent.Name] = envIdent + } + } + // Update authentication state based on whether identities exist // Once enabled, keep it enabled (one-way toggle) - authJustEnabled := iam.updateAuthenticationState(len(identities)) + authJustEnabled := iam.updateAuthenticationState(len(iam.identities)) iam.m.Unlock() if authJustEnabled { @@ -778,9 +806,10 @@ func (iam *IdentityAccessManagement) MergeS3ApiConfiguration(config *iam_pb.S3Ap iam.identityAnonymous = identityAnonymous iam.accounts = accounts iam.emailAccount = emailAccount - iam.accessKeyIdent = accessKeyIdent iam.nameToIdentity = nameToIdentity + iam.accessKeyIdent = accessKeyIdent iam.policies = policies + iam.accessKeyIdent = accessKeyIdent // Update authentication state based on whether identities exist // Once enabled, keep it enabled (one-way toggle) authJustEnabled := iam.updateAuthenticationState(len(identities)) diff --git a/weed/s3api/auth_signature_v4.go b/weed/s3api/auth_signature_v4.go index e5960c76c..f3dc4c7c9 100644 --- a/weed/s3api/auth_signature_v4.go +++ b/weed/s3api/auth_signature_v4.go @@ -22,6 +22,7 @@ import ( "crypto/hmac" "crypto/sha256" "crypto/subtle" + "encoding/base64" "encoding/hex" "io" "net" @@ -104,6 +105,24 @@ func getContentSha256Cksum(r *http.Request) string { return emptySHA256 } +// normalizePayloadHash converts base64-encoded payload hash to hex format. +// AWS SigV4 canonical requests always use hex-encoded SHA256. +func normalizePayloadHash(payloadHashValue string) string { + // Special values and hex-encoded hashes don't need conversion + if payloadHashValue == emptySHA256 || payloadHashValue == unsignedPayload || + payloadHashValue == streamingContentSHA256 || payloadHashValue == streamingContentSHA256Trailer || + payloadHashValue == streamingUnsignedPayload || len(payloadHashValue) == 64 { + return payloadHashValue + } + + // Try to decode as base64 and convert to hex + if decodedBytes, err := base64.StdEncoding.DecodeString(payloadHashValue); err == nil && len(decodedBytes) == 32 { + return hex.EncodeToString(decodedBytes) + } + + return payloadHashValue +} + // signValues data type represents structured form of AWS Signature V4 header. type signValues struct { Credential credentialHeader @@ -485,6 +504,10 @@ func extractV4AuthInfoFromHeader(r *http.Request) (*v4AuthInfo, s3err.ErrorCode) } } + // Normalize payload hash to hex format for canonical request + // AWS SigV4 canonical requests always use hex-encoded SHA256 + normalizedPayload := normalizePayloadHash(hashedPayload) + return &v4AuthInfo{ Signature: signV4Values.Signature, AccessKey: signV4Values.Credential.accessKey, @@ -493,7 +516,7 @@ func extractV4AuthInfoFromHeader(r *http.Request) (*v4AuthInfo, s3err.ErrorCode) Region: signV4Values.Credential.scope.region, Service: signV4Values.Credential.scope.service, Scope: signV4Values.Credential.getScope(), - HashedPayload: hashedPayload, + HashedPayload: normalizedPayload, IsPresigned: false, }, s3err.ErrNone } diff --git a/weed/s3api/iceberg/iceberg.go b/weed/s3api/iceberg/iceberg.go index 8122b4627..f0e454a51 100644 --- a/weed/s3api/iceberg/iceberg.go +++ b/weed/s3api/iceberg/iceberg.go @@ -18,37 +18,11 @@ import ( "github.com/gorilla/mux" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" - "github.com/seaweedfs/seaweedfs/weed/s3api" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/s3api/s3tables" ) -func (s *Server) checkAuth(w http.ResponseWriter, r *http.Request, action s3api.Action, bucketName string) bool { - identityName := s3_constants.GetIdentityNameFromContext(r) - if identityName == "" { - writeError(w, http.StatusUnauthorized, "NotAuthorizedException", "Authentication required") - return false - } - - identityObj := s3_constants.GetIdentityFromContext(r) - if identityObj == nil { - writeError(w, http.StatusForbidden, "ForbiddenException", "Access denied: missing identity") - return false - } - identity, ok := identityObj.(*s3api.Identity) - if !ok { - writeError(w, http.StatusForbidden, "ForbiddenException", "Access denied: invalid identity") - return false - } - - if !identity.CanDo(action, bucketName, "") { - writeError(w, http.StatusForbidden, "ForbiddenException", "Access denied") - return false - } - return true -} - // FilerClient provides access to the filer for storage operations. type FilerClient interface { WithFilerClient(streamingMode bool, fn func(client filer_pb.SeaweedFilerClient) error) error @@ -79,17 +53,20 @@ func NewServer(filerClient FilerClient, authenticator S3Authenticator) *Server { // RegisterRoutes registers Iceberg REST API routes on the provided router. func (s *Server) RegisterRoutes(router *mux.Router) { - // Configuration endpoint - router.HandleFunc("/v1/config", s.Auth(s.handleConfig)).Methods(http.MethodGet) + // Add middleware to log all requests/responses + router.Use(loggingMiddleware) + + // Configuration endpoint - no auth needed for config + router.HandleFunc("/v1/config", s.handleConfig).Methods(http.MethodGet) - // Namespace endpoints + // Namespace endpoints - wrapped with Auth middleware router.HandleFunc("/v1/namespaces", s.Auth(s.handleListNamespaces)).Methods(http.MethodGet) router.HandleFunc("/v1/namespaces", s.Auth(s.handleCreateNamespace)).Methods(http.MethodPost) router.HandleFunc("/v1/namespaces/{namespace}", s.Auth(s.handleGetNamespace)).Methods(http.MethodGet) router.HandleFunc("/v1/namespaces/{namespace}", s.Auth(s.handleNamespaceExists)).Methods(http.MethodHead) router.HandleFunc("/v1/namespaces/{namespace}", s.Auth(s.handleDropNamespace)).Methods(http.MethodDelete) - // Table endpoints + // Table endpoints - wrapped with Auth middleware router.HandleFunc("/v1/namespaces/{namespace}/tables", s.Auth(s.handleListTables)).Methods(http.MethodGet) router.HandleFunc("/v1/namespaces/{namespace}/tables", s.Auth(s.handleCreateTable)).Methods(http.MethodPost) router.HandleFunc("/v1/namespaces/{namespace}/tables/{table}", s.Auth(s.handleLoadTable)).Methods(http.MethodGet) @@ -97,7 +74,7 @@ func (s *Server) RegisterRoutes(router *mux.Router) { router.HandleFunc("/v1/namespaces/{namespace}/tables/{table}", s.Auth(s.handleDropTable)).Methods(http.MethodDelete) router.HandleFunc("/v1/namespaces/{namespace}/tables/{table}", s.Auth(s.handleUpdateTable)).Methods(http.MethodPost) - // With prefix support + // With prefix support - wrapped with Auth middleware router.HandleFunc("/v1/{prefix}/namespaces", s.Auth(s.handleListNamespaces)).Methods(http.MethodGet) router.HandleFunc("/v1/{prefix}/namespaces", s.Auth(s.handleCreateNamespace)).Methods(http.MethodPost) router.HandleFunc("/v1/{prefix}/namespaces/{namespace}", s.Auth(s.handleGetNamespace)).Methods(http.MethodGet) @@ -110,7 +87,48 @@ func (s *Server) RegisterRoutes(router *mux.Router) { router.HandleFunc("/v1/{prefix}/namespaces/{namespace}/tables/{table}", s.Auth(s.handleDropTable)).Methods(http.MethodDelete) router.HandleFunc("/v1/{prefix}/namespaces/{namespace}/tables/{table}", s.Auth(s.handleUpdateTable)).Methods(http.MethodPost) - glog.V(0).Infof("Registered Iceberg REST Catalog routes") + // Catch-all for debugging + router.PathPrefix("/").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + glog.V(2).Infof("Catch-all route hit: %s %s", r.Method, r.RequestURI) + writeError(w, http.StatusNotFound, "NotFound", "Path not found") + }) + + glog.V(2).Infof("Registered Iceberg REST Catalog routes") +} + +func loggingMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + glog.V(2).Infof("Iceberg REST request: %s %s from %s", r.Method, r.RequestURI, r.RemoteAddr) + + // Log all headers for debugging + glog.V(2).Infof("Iceberg REST headers:") + for name, values := range r.Header { + for _, value := range values { + // Redact sensitive headers + if name == "Authorization" && len(value) > 20 { + glog.V(2).Infof(" %s: %s...%s", name, value[:20], value[len(value)-10:]) + } else { + glog.V(2).Infof(" %s: %s", name, value) + } + } + } + + // Create a response writer that captures the status code + wrapped := &responseWriter{ResponseWriter: w} + next.ServeHTTP(wrapped, r) + + glog.V(2).Infof("Iceberg REST response: %s %s -> %d", r.Method, r.RequestURI, wrapped.statusCode) + }) +} + +type responseWriter struct { + http.ResponseWriter + statusCode int +} + +func (w *responseWriter) WriteHeader(code int) { + w.statusCode = code + w.ResponseWriter.WriteHeader(code) } func (s *Server) Auth(handler http.HandlerFunc) http.HandlerFunc { @@ -293,8 +311,8 @@ func getBucketFromPrefix(r *http.Request) string { if prefix := vars["prefix"]; prefix != "" { return prefix } - // Default bucket if no prefix - return "default" + // Default bucket if no prefix - use "warehouse" for Iceberg + return "warehouse" } // buildTableBucketARN builds an ARN for a table bucket. @@ -305,25 +323,28 @@ func buildTableBucketARN(bucketName string) string { // handleConfig returns catalog configuration. func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) { - bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) { - return - } + glog.Infof("handleConfig: START") + glog.Infof("handleConfig: setting Content-Type header") + w.Header().Set("Content-Type", "application/json") config := CatalogConfig{ Defaults: map[string]string{}, Overrides: map[string]string{}, } - writeJSON(w, http.StatusOK, config) + glog.Infof("handleConfig: encoding JSON") + if err := json.NewEncoder(w).Encode(config); err != nil { + glog.Warningf("handleConfig: Failed to encode config: %v", err) + } + glog.Infof("handleConfig: COMPLETE") } // handleListNamespaces lists namespaces in a catalog. func (s *Server) handleListNamespaces(w http.ResponseWriter, r *http.Request) { bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_LIST, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + // Use S3 Tables manager to list namespaces var resp s3tables.ListNamespacesResponse req := &s3tables.ListNamespacesRequest{ @@ -333,11 +354,11 @@ func (s *Server) handleListNamespaces(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "ListNamespaces", req, &resp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "ListNamespaces", req, &resp, identityName) }) if err != nil { - glog.V(1).Infof("Iceberg: ListNamespaces error: %v", err) + glog.Infof("Iceberg: ListNamespaces error: %v", err) writeError(w, http.StatusInternalServerError, "InternalServerError", err.Error()) return } @@ -357,11 +378,11 @@ func (s *Server) handleListNamespaces(w http.ResponseWriter, r *http.Request) { // handleCreateNamespace creates a new namespace. func (s *Server) handleCreateNamespace(w http.ResponseWriter, r *http.Request) { bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_WRITE, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + var req CreateNamespaceRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "BadRequestException", "Invalid request body") @@ -382,15 +403,18 @@ func (s *Server) handleCreateNamespace(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "CreateNamespace", createReq, &createResp, "") + glog.Errorf("Iceberg: handleCreateNamespace calling Execute with identityName=%s", identityName) + return s.tablesManager.Execute(r.Context(), mgrClient, "CreateNamespace", createReq, &createResp, identityName) }) if err != nil { + glog.Errorf("Iceberg: handleCreateNamespace error: %v", err) + if strings.Contains(err.Error(), "already exists") { writeError(w, http.StatusConflict, "AlreadyExistsException", err.Error()) return } - glog.V(1).Infof("Iceberg: CreateNamespace error: %v", err) + glog.Infof("Iceberg: CreateNamespace error: %v", err) writeError(w, http.StatusInternalServerError, "InternalServerError", err.Error()) return } @@ -418,11 +442,11 @@ func (s *Server) handleGetNamespace(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + // Use S3 Tables manager to get namespace getReq := &s3tables.GetNamespaceRequest{ TableBucketARN: bucketARN, @@ -432,7 +456,7 @@ func (s *Server) handleGetNamespace(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "GetNamespace", getReq, &getResp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "GetNamespace", getReq, &getResp, identityName) }) if err != nil { @@ -462,11 +486,11 @@ func (s *Server) handleNamespaceExists(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + getReq := &s3tables.GetNamespaceRequest{ TableBucketARN: bucketARN, Namespace: namespace, @@ -475,7 +499,7 @@ func (s *Server) handleNamespaceExists(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "GetNamespace", getReq, &getResp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "GetNamespace", getReq, &getResp, identityName) }) if err != nil { @@ -500,11 +524,11 @@ func (s *Server) handleDropNamespace(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_DELETE_BUCKET, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + deleteReq := &s3tables.DeleteNamespaceRequest{ TableBucketARN: bucketARN, Namespace: namespace, @@ -512,10 +536,11 @@ func (s *Server) handleDropNamespace(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "DeleteNamespace", deleteReq, nil, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "DeleteNamespace", deleteReq, nil, identityName) }) if err != nil { + if strings.Contains(err.Error(), "not found") { writeError(w, http.StatusNotFound, "NoSuchNamespaceException", fmt.Sprintf("Namespace does not exist: %v", namespace)) return @@ -542,11 +567,11 @@ func (s *Server) handleListTables(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_LIST, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + listReq := &s3tables.ListTablesRequest{ TableBucketARN: bucketARN, Namespace: namespace, @@ -556,7 +581,7 @@ func (s *Server) handleListTables(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "ListTables", listReq, &listResp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "ListTables", listReq, &listResp, identityName) }) if err != nil { @@ -605,11 +630,11 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_WRITE, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + // Generate UUID for the new table tableUUID := uuid.New() location := fmt.Sprintf("s3://%s/%s/%s", bucketName, encodeNamespace(namespace), req.Name) @@ -657,7 +682,7 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) { err = s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "CreateTable", createReq, &createResp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "CreateTable", createReq, &createResp, identityName) }) if err != nil { @@ -696,11 +721,11 @@ func (s *Server) handleLoadTable(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + getReq := &s3tables.GetTableRequest{ TableBucketARN: bucketARN, Namespace: namespace, @@ -710,10 +735,11 @@ func (s *Server) handleLoadTable(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName) }) if err != nil { + if strings.Contains(err.Error(), "not found") { writeError(w, http.StatusNotFound, "NoSuchTableException", fmt.Sprintf("Table does not exist: %s", tableName)) return @@ -771,11 +797,11 @@ func (s *Server) handleTableExists(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_READ, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + getReq := &s3tables.GetTableRequest{ TableBucketARN: bucketARN, Namespace: namespace, @@ -785,7 +811,7 @@ func (s *Server) handleTableExists(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName) }) if err != nil { @@ -807,11 +833,11 @@ func (s *Server) handleDropTable(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_DELETE_BUCKET, bucketName) { - return - } bucketARN := buildTableBucketARN(bucketName) + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) + deleteReq := &s3tables.DeleteTableRequest{ TableBucketARN: bucketARN, Namespace: namespace, @@ -820,7 +846,7 @@ func (s *Server) handleDropTable(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "DeleteTable", deleteReq, nil, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "DeleteTable", deleteReq, nil, identityName) }) if err != nil { @@ -849,9 +875,10 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { } bucketName := getBucketFromPrefix(r) - if !s.checkAuth(w, r, s3_constants.ACTION_WRITE, bucketName) { - return - } + bucketARN := buildTableBucketARN(bucketName) + + // Extract identity from context + identityName := s3_constants.GetIdentityNameFromContext(r) // Parse the commit request var req CommitTableRequest @@ -860,8 +887,6 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { return } - bucketARN := buildTableBucketARN(bucketName) - // First, load current table metadata getReq := &s3tables.GetTableRequest{ TableBucketARN: bucketARN, @@ -872,7 +897,7 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { mgrClient := s3tables.NewManagerClient(client) - return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName) }) if err != nil { @@ -985,7 +1010,7 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { // 1. Write metadata file (this would normally be an S3 PutObject, // but s3tables manager handles the metadata storage logic) // For now, we assume s3tables.UpdateTable handles the reference update. - return s.tablesManager.Execute(r.Context(), mgrClient, "UpdateTable", updateReq, nil, "") + return s.tablesManager.Execute(r.Context(), mgrClient, "UpdateTable", updateReq, nil, identityName) }) if err != nil { @@ -1002,14 +1027,6 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, result) } -// loadTableResultJSON is used for JSON serialization of LoadTableResult. -// It wraps table.Metadata (which is an interface) for proper JSON output. -type loadTableResultJSON struct { - MetadataLocation string `json:"metadata-location,omitempty"` - Metadata table.Metadata `json:"metadata"` - Config iceberg.Properties `json:"config,omitempty"` -} - // newTableMetadata creates a new table.Metadata object with the given parameters. // Uses iceberg-go's MetadataBuilder pattern for proper spec compliance. func newTableMetadata( diff --git a/weed/s3api/s3api_tables.go b/weed/s3api/s3api_tables.go index 54177c4d9..b93d5879f 100644 --- a/weed/s3api/s3api_tables.go +++ b/weed/s3api/s3api_tables.go @@ -632,6 +632,7 @@ func buildUntagResourceRequest(r *http.Request) (interface{}, error) { // which performs granular permission checks based on the specific operation. func (s3a *S3ApiServer) authenticateS3Tables(f http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + glog.V(2).Infof("S3Tables: authenticateS3Tables called, iam.isEnabled()=%t", s3a.iam.isEnabled()) if !s3a.iam.isEnabled() { f(w, r) return @@ -640,15 +641,19 @@ func (s3a *S3ApiServer) authenticateS3Tables(f http.HandlerFunc) http.HandlerFun // Use AuthSignatureOnly to authenticate the request without authorizing specific actions identity, errCode := s3a.iam.AuthSignatureOnly(r) if errCode != s3err.ErrNone { + glog.Errorf("S3Tables: AuthSignatureOnly failed: %v", errCode) s3err.WriteErrorResponse(w, r, errCode) return } // Store the authenticated identity in request context if identity != nil && identity.Name != "" { + glog.V(2).Infof("S3Tables: authenticated identity Name=%s Account.Id=%s", identity.Name, identity.Account.Id) ctx := s3_constants.SetIdentityNameInContext(r.Context(), identity.Name) ctx = s3_constants.SetIdentityInContext(ctx, identity) r = r.WithContext(ctx) + } else { + glog.V(2).Infof("S3Tables: authenticated identity is nil or empty name") } f(w, r) diff --git a/weed/s3api/s3tables/handler.go b/weed/s3api/s3tables/handler.go index dae11d562..39b10ce0e 100644 --- a/weed/s3api/s3tables/handler.go +++ b/weed/s3api/s3tables/handler.go @@ -164,9 +164,32 @@ func (h *S3TablesHandler) HandleRequest(w http.ResponseWriter, r *http.Request, // This is also used as the principal for permission checks, ensuring alignment between // the caller identity and ownership verification when IAM is enabled. func (h *S3TablesHandler) getAccountID(r *http.Request) string { + identityRaw := s3_constants.GetIdentityFromContext(r) + if identityRaw != nil { + // Use reflection to access the Account.Id field to avoid import cycle + val := reflect.ValueOf(identityRaw) + if val.Kind() == reflect.Ptr { + val = val.Elem() + } + if val.Kind() == reflect.Struct { + accountField := val.FieldByName("Account") + if accountField.IsValid() && !accountField.IsNil() { + accountVal := accountField.Elem() + if accountVal.Kind() == reflect.Struct { + idField := accountVal.FieldByName("Id") + if idField.IsValid() && idField.Kind() == reflect.String { + id := idField.String() + return id + } + } + } + } + } + if identityName := s3_constants.GetIdentityNameFromContext(r); identityName != "" { return identityName } + if accountID := r.Header.Get(s3_constants.AmzAccountId); accountID != "" { return accountID } diff --git a/weed/s3api/s3tables/handler_namespace.go b/weed/s3api/s3tables/handler_namespace.go index a732edc6e..492a53241 100644 --- a/weed/s3api/s3tables/handler_namespace.go +++ b/weed/s3api/s3tables/handler_namespace.go @@ -9,13 +9,16 @@ import ( "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" ) // handleCreateNamespace creates a new namespace in a table bucket func (h *S3TablesHandler) handleCreateNamespace(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { + glog.Errorf("S3Tables: handleCreateNamespace called") var req CreateNamespaceRequest if err := h.readRequestBody(r, &req); err != nil { + glog.Errorf("S3Tables: handleCreateNamespace failed to read request body: %v", err) h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) return err } @@ -83,12 +86,14 @@ func (h *S3TablesHandler) handleCreateNamespace(w http.ResponseWriter, r *http.R bucketARN := h.generateTableBucketARN(bucketMetadata.OwnerAccountID, bucketName) principal := h.getAccountID(r) identityActions := getIdentityActions(r) + glog.Infof("S3Tables: CreateNamespace permission check - principal=%s, owner=%s, actions=%v", principal, bucketMetadata.OwnerAccountID, identityActions) if !CheckPermissionWithContext("CreateNamespace", principal, bucketMetadata.OwnerAccountID, bucketPolicy, bucketARN, &PolicyContext{ TableBucketName: bucketName, Namespace: namespaceName, TableBucketTags: bucketTags, IdentityActions: identityActions, }) { + glog.Infof("S3Tables: Permission denied for CreateNamespace - principal=%s, owner=%s", principal, bucketMetadata.OwnerAccountID) h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to create namespace in this bucket") return ErrAccessDenied } diff --git a/weed/s3api/s3tables/permissions.go b/weed/s3api/s3tables/permissions.go index af28c0a88..17d3b1a04 100644 --- a/weed/s3api/s3tables/permissions.go +++ b/weed/s3api/s3tables/permissions.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/s3api/policy_engine" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" ) @@ -110,6 +111,8 @@ func CheckPermissionWithContext(operation, principal, owner, resourcePolicy, res return true } + glog.V(2).Infof("S3Tables: CheckPermission operation=%s principal=%s owner=%s", operation, principal, owner) + return checkPermission(operation, principal, owner, resourcePolicy, resourceARN, ctx) }