diff --git a/.github/workflows/s3-tables-tests.yml b/.github/workflows/s3-tables-tests.yml index dd195b63a..b6e9e69f7 100644 --- a/.github/workflows/s3-tables-tests.yml +++ b/.github/workflows/s3-tables-tests.yml @@ -394,6 +394,75 @@ jobs: path: test/s3tables/sts_integration/test-output.log retention-days: 3 + lakekeeper-integration-tests: + name: Lakekeeper Integration Tests + runs-on: ubuntu-22.04 + timeout-minutes: 30 + + steps: + - name: Check out code + uses: actions/checkout@v6 + + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' + id: go + + - name: Set up Docker + uses: docker/setup-buildx-action@v3 + + - name: Pre-pull Python image + run: docker pull python:3 + + - name: Pre-pull LocalStack image (if needed) + run: docker pull localstack/localstack:latest || true + + - name: Run go mod tidy + run: go mod tidy + + - name: Install SeaweedFS + run: | + go install -buildvcs=false ./weed + + - name: Run Lakekeeper Integration Tests + timeout-minutes: 25 + working-directory: test/s3tables/lakekeeper + run: | + set -x + set -o pipefail + echo "=== System Information ===" + uname -a + free -h + df -h + echo "=== Starting Lakekeeper Integration Tests ===" + + # Run Lakekeeper integration tests + go test -v -timeout 20m . 2>&1 | tee test-output.log || { + echo "Lakekeeper integration tests failed" + exit 1 + } + + - name: Show test output on failure + if: failure() + working-directory: test/s3tables/lakekeeper + run: | + echo "=== Test Output ===" + if [ -f test-output.log ]; then + tail -200 test-output.log + fi + + echo "=== Process information ===" + ps aux | grep -E "(weed|test|docker)" || true + + - name: Upload test logs on failure + if: failure() + uses: actions/upload-artifact@v6 + with: + name: lakekeeper-integration-test-logs + path: test/s3tables/lakekeeper/test-output.log + retention-days: 3 + s3-tables-build-verification: name: S3 Tables Build Verification runs-on: ubuntu-22.04 diff --git a/test/s3tables/lakekeeper/lakekeeper_test.go b/test/s3tables/lakekeeper/lakekeeper_test.go new file mode 100644 index 000000000..67cab1139 --- /dev/null +++ b/test/s3tables/lakekeeper/lakekeeper_test.go @@ -0,0 +1,341 @@ +package lakekeeper + +import ( + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/test/s3tables/testutil" +) + +type TestEnvironment struct { + seaweedDir string + weedBinary string + dataDir string + bindIP string + s3Port int + s3GrpcPort int + masterPort int + masterGrpcPort int + filerPort int + filerGrpcPort int + volumePort int + volumeGrpcPort int + weedProcess *exec.Cmd + weedCancel context.CancelFunc + accessKey string + secretKey string +} + +func TestLakekeeperIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + if !testutil.HasDocker() { + t.Skip("Docker not available, skipping Lakekeeper integration test") + } + + env := NewTestEnvironment(t) + defer env.Cleanup(t) + + fmt.Printf(">>> Starting SeaweedFS with Lakekeeper configuration...\n") + env.StartSeaweedFS(t) + fmt.Printf(">>> SeaweedFS started.\n") + + // Run python script in docker to test STS and S3 operations + runLakekeeperRepro(t, env) +} + +func NewTestEnvironment(t *testing.T) *TestEnvironment { + t.Helper() + + wd, err := os.Getwd() + if err != nil { + t.Fatalf("Failed to get working directory: %v", err) + } + + seaweedDir := wd + for i := 0; i < 6; i++ { + if _, err := os.Stat(filepath.Join(seaweedDir, "go.mod")); err == nil { + break + } + seaweedDir = filepath.Dir(seaweedDir) + } + + weedBinary := filepath.Join(seaweedDir, "weed", "weed") + if _, err := os.Stat(weedBinary); err != nil { + weedBinary = "weed" + if _, err := exec.LookPath(weedBinary); err != nil { + t.Skip("weed binary not found, skipping integration test") + } + } + + dataDir, err := os.MkdirTemp("", "seaweed-lakekeeper-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + + bindIP := testutil.FindBindIP() + + masterPort, masterGrpcPort := testutil.MustFreePortPair(t, "Master") + volumePort, volumeGrpcPort := testutil.MustFreePortPair(t, "Volume") + filerPort, filerGrpcPort := testutil.MustFreePortPair(t, "Filer") + s3Port, s3GrpcPort := testutil.MustFreePortPair(t, "S3") + + return &TestEnvironment{ + seaweedDir: seaweedDir, + weedBinary: weedBinary, + dataDir: dataDir, + bindIP: bindIP, + s3Port: s3Port, + s3GrpcPort: s3GrpcPort, + masterPort: masterPort, + masterGrpcPort: masterGrpcPort, + filerPort: filerPort, + filerGrpcPort: filerGrpcPort, + volumePort: volumePort, + volumeGrpcPort: volumeGrpcPort, + accessKey: "admin", + secretKey: "admin", + } +} + +func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { + t.Helper() + + iamConfigPath := filepath.Join(env.dataDir, "iam.json") + // Note: signingKey must be base64 encoded for []byte JSON unmarshaling + iamConfig := fmt.Sprintf(`{ + "identities": [ + { + "name": "admin", + "credentials": [ + { + "accessKey": "%s", + "secretKey": "%s" + } + ], + "actions": ["Admin", "Read", "List", "Tagging", "Write"] + } + ], + "sts": { + "tokenDuration": "12h", + "maxSessionLength": "24h", + "issuer": "seaweedfs-sts", + "signingKey": "dGVzdC1zaWduaW5nLWtleS1mb3Itc3RzLWludGVncmF0aW9uLXRlc3Rz" + }, + "roles": [ + { + "roleName": "LakekeeperVendedRole", + "roleArn": "arn:aws:iam::000000000000:role/LakekeeperVendedRole", + "trustPolicy": { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": "*", + "Action": "sts:AssumeRole" + } + ] + }, + "attachedPolicies": ["FullAccess"] + } + ], + "policies": [ + { + "name": "FullAccess", + "document": { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": "*", + "Resource": "*" + } + ] + } + } + ] +}`, env.accessKey, env.secretKey) + + if err := os.WriteFile(iamConfigPath, []byte(iamConfig), 0644); err != nil { + t.Fatalf("Failed to create IAM config: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + env.weedCancel = cancel + + // Start weed mini with both S3 config (standard IAM) and IAM config (advanced IAM/STS) + cmd := exec.CommandContext(ctx, env.weedBinary, "-v", "4", "mini", + "-master.port", fmt.Sprintf("%d", env.masterPort), + "-master.port.grpc", fmt.Sprintf("%d", env.masterGrpcPort), + "-volume.port", fmt.Sprintf("%d", env.volumePort), + "-volume.port.grpc", fmt.Sprintf("%d", env.volumeGrpcPort), + "-filer.port", fmt.Sprintf("%d", env.filerPort), + "-filer.port.grpc", fmt.Sprintf("%d", env.filerGrpcPort), + "-s3.port", fmt.Sprintf("%d", env.s3Port), + "-s3.port.grpc", fmt.Sprintf("%d", env.s3GrpcPort), + "-s3.config", iamConfigPath, + "-s3.iam.config", iamConfigPath, + "-s3.iam.readOnly=false", + "-ip", env.bindIP, + "-ip.bind", "0.0.0.0", + "-dir", env.dataDir, + ) + cmd.Dir = env.dataDir + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + if err := cmd.Start(); err != nil { + t.Fatalf("Failed to start SeaweedFS: %v", err) + } + env.weedProcess = cmd + + if !testutil.WaitForService(fmt.Sprintf("http://localhost:%d/status", env.s3Port), 30*time.Second) { + t.Fatalf("S3 API failed to become ready") + } +} + +func (env *TestEnvironment) Cleanup(t *testing.T) { + t.Helper() + if env.weedCancel != nil { + env.weedCancel() + } + if env.weedProcess != nil { + time.Sleep(1 * time.Second) + _ = env.weedProcess.Wait() + } + if env.dataDir != "" { + _ = os.RemoveAll(env.dataDir) + } +} + +func runLakekeeperRepro(t *testing.T, env *TestEnvironment) { + t.Helper() + + scriptContent := fmt.Sprintf(` +import boto3 +import botocore.config +import botocore +from botocore.exceptions import ClientError +import os +import sys +import time +import logging + +# Enable botocore debug logging to see signature calculation +logging.basicConfig(level=logging.DEBUG) +botocore.session.get_session().set_debug_logger() + +print("Starting Lakekeeper repro test...") + +endpoint_url = "http://host.docker.internal:%d" +access_key = "%s" +secret_key = "%s" +region = "us-east-1" + +print(f"Connecting to {endpoint_url}") + +try: + config = botocore.config.Config( + retries={'max_attempts': 3} + ) + sts = boto3.client( + 'sts', + endpoint_url=endpoint_url, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + region_name=region, + config=config + ) + + role_arn = "arn:aws:iam::000000000000:role/LakekeeperVendedRole" + session_name = "lakekeeper-session" + + print(f"Calling AssumeRole on {role_arn} with POST body...") + + # Standard boto3 call sends parameters in POST body + response = sts.assume_role( + RoleArn=role_arn, + RoleSessionName=session_name + ) + + creds = response['Credentials'] + access_key_id = creds['AccessKeyId'] + secret_access_key = creds['SecretAccessKey'] + session_token = creds['SessionToken'] + + print(f"Success! Got credentials with prefix: {access_key_id[:4]}") + + if not access_key_id.startswith("ASIA"): + print(f"FAILED: Expected ASIA prefix, got {access_key_id}") + sys.exit(1) + + print("Verifying S3 operations with vended credentials...") + s3 = boto3.client( + 's3', + endpoint_url=endpoint_url, + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + aws_session_token=session_token, + region_name=region, + config=config + ) + + bucket = "lakekeeper-vended-bucket" + print(f"Creating bucket {bucket}...") + s3.create_bucket(Bucket=bucket) + + print("Listing buckets...") + response = s3.list_buckets() + buckets = [b['Name'] for b in response['Buckets']] + print(f"Found buckets: {buckets}") + + if bucket not in buckets: + print(f"FAILED: Bucket {bucket} not found in list") + sys.exit(1) + + print("SUCCESS: Lakekeeper flow verified!") + sys.exit(0) + +except Exception as e: + print(f"FAILED: {e}") + # Print more details if it is a ClientError + if hasattr(e, 'response'): + print(f"Response: {e.response}") + sys.exit(1) +`, env.s3Port, env.accessKey, env.secretKey) + + scriptPath := filepath.Join(env.dataDir, "lakekeeper_repro.py") + if err := os.WriteFile(scriptPath, []byte(scriptContent), 0644); err != nil { + t.Fatalf("Failed to write python script: %v", err) + } + + containerName := "seaweed-lakekeeper-client-" + fmt.Sprintf("%d", time.Now().UnixNano()) + + // Create a context with timeout for the docker run command + dockerCtx, dockerCancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer dockerCancel() + + cmd := exec.CommandContext(dockerCtx, "docker", "run", "--rm", + "--name", containerName, + "--add-host", "host.docker.internal:host-gateway", + "-v", fmt.Sprintf("%s:/work", env.dataDir), + "python:3", + "/bin/bash", "-c", "pip install boto3 && python /work/lakekeeper_repro.py", + ) + + output, err := cmd.CombinedOutput() + if err != nil { + if dockerCtx.Err() == context.DeadlineExceeded { + t.Fatalf("Lakekeeper repro client timed out after 5 minutes\nOutput:\n%s", string(output)) + } + t.Fatalf("Lakekeeper repro client failed: %v\nOutput:\n%s", err, string(output)) + } + t.Logf("Lakekeeper repro client output:\n%s", string(output)) +} diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index d305f8b46..e34053a43 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -1271,6 +1271,7 @@ func (iam *IdentityAccessManagement) authRequestWithAuthType(r *http.Request, ac // the specific IAM action (e.g., self-service vs admin operations). // Returns the authenticated identity and any signature verification error. func (iam *IdentityAccessManagement) AuthSignatureOnly(r *http.Request) (*Identity, s3err.ErrorCode) { + var identity *Identity var s3Err s3err.ErrorCode var authType string diff --git a/weed/s3api/auth_signature_v4.go b/weed/s3api/auth_signature_v4.go index d30b5bf17..9e58daf47 100644 --- a/weed/s3api/auth_signature_v4.go +++ b/weed/s3api/auth_signature_v4.go @@ -79,7 +79,7 @@ func streamHashRequestBody(r *http.Request, sizeLimit int64) (string, error) { return "", err } - r.Body = io.NopCloser(&bodyBuffer) + r.Body = io.NopCloser(bytes.NewReader(bodyBuffer.Bytes())) if bodyBuffer.Len() == 0 { return emptySHA256, nil diff --git a/weed/s3api/s3api_sts.go b/weed/s3api/s3api_sts.go index 943e67929..d6fac9b0b 100644 --- a/weed/s3api/s3api_sts.go +++ b/weed/s3api/s3api_sts.go @@ -5,8 +5,6 @@ package s3api // AWS SDKs to obtain temporary credentials using OIDC/JWT tokens. import ( - "crypto/rand" - "encoding/base64" "encoding/xml" "errors" "fmt" @@ -488,24 +486,14 @@ func (h *STSHandlers) prepareSTSCredentials(roleArn, roleSessionName, principalA return STSCredentials{}, nil, fmt.Errorf("failed to generate session token: %w", err) } - // Generate temporary credentials (cryptographically secure) - // AccessKeyId: ASIA + 16 chars hex - // SecretAccessKey: 40 chars base64 - randBytes := make([]byte, 30) // Sufficient for both - if _, err := rand.Read(randBytes); err != nil { - return STSCredentials{}, nil, fmt.Errorf("failed to generate random bytes: %w", err) - } - - // Generate AccessKeyId (ASIA + 16 upper-hex chars) - // We use 8 bytes (16 hex chars) - accessKeyId := "ASIA" + fmt.Sprintf("%X", randBytes[:8]) - - // Generate SecretAccessKey: 30 random bytes, base64-encoded to a 40-character string - secretBytes := make([]byte, 30) - if _, err := rand.Read(secretBytes); err != nil { - return STSCredentials{}, nil, fmt.Errorf("failed to generate secret bytes: %w", err) + // Generate temporary credentials (deterministic based on sessionId) + stsCredGen := sts.NewCredentialGenerator() + stsCredsDet, err := stsCredGen.GenerateTemporaryCredentials(sessionId, expiration) + if err != nil { + return STSCredentials{}, nil, fmt.Errorf("failed to generate temporary credentials: %w", err) } - secretAccessKey := base64.StdEncoding.EncodeToString(secretBytes) + accessKeyId := stsCredsDet.AccessKeyId + secretAccessKey := stsCredsDet.SecretAccessKey // Get account ID from STS config or use default accountId := defaultAccountId