diff --git a/.github/workflows/s3-tables-tests.yml b/.github/workflows/s3-tables-tests.yml index b6e9e69f7..dd4d541e3 100644 --- a/.github/workflows/s3-tables-tests.yml +++ b/.github/workflows/s3-tables-tests.yml @@ -194,6 +194,67 @@ jobs: path: test/s3tables/catalog_trino/test-output.log retention-days: 3 + polaris-integration-tests: + name: Polaris Integration Tests + runs-on: ubuntu-22.04 + timeout-minutes: 30 + + steps: + - name: Check out code + uses: actions/checkout@v6 + + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' + id: go + + - name: Run go mod tidy + run: go mod tidy + + - name: Install SeaweedFS + run: | + go install -buildvcs=false ./weed + + - name: Pre-pull Polaris image + run: docker pull apache/polaris:latest + + - name: Run Polaris Integration Tests + timeout-minutes: 25 + run: | + set -x + set -o pipefail + echo "=== System Information ===" + uname -a + free -h + df -h + echo "=== Starting Polaris Tests ===" + + go test -v -timeout 20m ./test/s3tables/polaris 2>&1 | tee test/s3tables/polaris/test-output.log || { + echo "Polaris integration tests failed" + exit 1 + } + + - name: Show test output on failure + if: failure() + working-directory: test/s3tables/polaris + run: | + echo "=== Test Output ===" + if [ -f test-output.log ]; then + tail -200 test-output.log + fi + + echo "=== Process information ===" + ps aux | grep -E "(weed|test|docker)" || true + + - name: Upload test logs on failure + if: failure() + uses: actions/upload-artifact@v6 + with: + name: polaris-test-logs + path: test/s3tables/polaris/test-output.log + retention-days: 3 + spark-iceberg-catalog-tests: name: Spark Iceberg Catalog Integration Tests runs-on: ubuntu-22.04 diff --git a/test/s3tables/polaris/polaris_env_test.go b/test/s3tables/polaris/polaris_env_test.go new file mode 100644 index 000000000..2a3496e2f --- /dev/null +++ b/test/s3tables/polaris/polaris_env_test.go @@ -0,0 +1,377 @@ +package polaris + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + + "github.com/seaweedfs/seaweedfs/test/s3tables/testutil" +) + +const ( + polarisImage = "apache/polaris:latest" + polarisRealm = "POLARIS" + polarisRootClientID = "root" + polarisRootClientSecret = "s3cr3t" + polarisRegion = "us-east-1" + polarisRoleArn = "arn:aws:iam::000000000000:role/LakekeeperVendedRole" + polarisSigningKey = "dGVzdC1zaWduaW5nLWtleS1mb3Itc3RzLWludGVncmF0aW9uLXRlc3Rz" // gitleaks:allow - test signing key +) + +type TestEnvironment struct { + seaweedDir string + weedBinary string + dataDir string + bindIP string + s3Port int + s3GrpcPort int + masterPort int + masterGrpcPort int + filerPort int + filerGrpcPort int + volumePort int + volumeGrpcPort int + polarisPort int + polarisAdminPort int + weedProcess *exec.Cmd + weedCancel context.CancelFunc + polarisContainer string + accessKey string + secretKey string +} + +func NewTestEnvironment(t *testing.T) *TestEnvironment { + t.Helper() + + if !testutil.HasDocker() { + t.Skip("Docker is required for Polaris integration tests") + } + + wd, err := os.Getwd() + if err != nil { + t.Fatalf("Failed to get working directory: %v", err) + } + + seaweedDir := wd + for i := 0; i < 6; i++ { + if _, err := os.Stat(filepath.Join(seaweedDir, "go.mod")); err == nil { + break + } + seaweedDir = filepath.Dir(seaweedDir) + } + + weedBinary := filepath.Join(seaweedDir, "weed", "weed") + if _, err := os.Stat(weedBinary); err != nil { + weedBinary = "weed" + if _, err := exec.LookPath(weedBinary); err != nil { + t.Skip("weed binary not found, skipping integration test") + } + } + + dataDir, err := os.MkdirTemp("", "seaweed-polaris-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + + bindIP := testutil.FindBindIP() + + masterPort, masterGrpcPort := testutil.MustFreePortPair(t, "Master") + volumePort, volumeGrpcPort := testutil.MustFreePortPair(t, "Volume") + filerPort, filerGrpcPort := testutil.MustFreePortPair(t, "Filer") + s3Port, s3GrpcPort := testutil.MustFreePortPair(t, "S3") + polarisPort, polarisAdminPort := testutil.MustFreePortPair(t, "Polaris") + + return &TestEnvironment{ + seaweedDir: seaweedDir, + weedBinary: weedBinary, + dataDir: dataDir, + bindIP: bindIP, + s3Port: s3Port, + s3GrpcPort: s3GrpcPort, + masterPort: masterPort, + masterGrpcPort: masterGrpcPort, + filerPort: filerPort, + filerGrpcPort: filerGrpcPort, + volumePort: volumePort, + volumeGrpcPort: volumeGrpcPort, + polarisPort: polarisPort, + polarisAdminPort: polarisAdminPort, + accessKey: "admin", + secretKey: "admin", + } +} + +func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { + t.Helper() + + iamConfigPath := filepath.Join(env.dataDir, "iam.json") + iamConfig := fmt.Sprintf(`{ + "identities": [ + { + "name": "admin", + "credentials": [ + { + "accessKey": "%s", + "secretKey": "%s" + } + ], + "actions": ["Admin", "Read", "List", "Tagging", "Write"] + } + ], + "sts": { + "tokenDuration": "12h", + "maxSessionLength": "24h", + "issuer": "seaweedfs-sts", + "signingKey": "%s" + }, + "roles": [ + { + "roleName": "LakekeeperVendedRole", + "roleArn": "%s", + "trustPolicy": { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": "*", + "Action": "sts:AssumeRole" + } + ] + }, + "attachedPolicies": ["FullAccess"] + } + ], + "policies": [ + { + "name": "FullAccess", + "document": { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": "*", + "Resource": "*" + } + ] + } + } + ] +}`, env.accessKey, env.secretKey, polarisSigningKey, polarisRoleArn) + + if err := os.WriteFile(iamConfigPath, []byte(iamConfig), 0644); err != nil { + t.Fatalf("Failed to create IAM config: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + env.weedCancel = cancel + + cmd := exec.CommandContext(ctx, env.weedBinary, "-v", "4", "mini", + "-master.port", fmt.Sprintf("%d", env.masterPort), + "-master.port.grpc", fmt.Sprintf("%d", env.masterGrpcPort), + "-volume.port", fmt.Sprintf("%d", env.volumePort), + "-volume.port.grpc", fmt.Sprintf("%d", env.volumeGrpcPort), + "-filer.port", fmt.Sprintf("%d", env.filerPort), + "-filer.port.grpc", fmt.Sprintf("%d", env.filerGrpcPort), + "-s3.port", fmt.Sprintf("%d", env.s3Port), + "-s3.port.grpc", fmt.Sprintf("%d", env.s3GrpcPort), + "-s3.config", iamConfigPath, + "-s3.iam.config", iamConfigPath, + "-s3.iam.readOnly=false", + "-ip", env.bindIP, + "-ip.bind", "0.0.0.0", + "-dir", env.dataDir, + ) + cmd.Dir = env.dataDir + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + if err := cmd.Start(); err != nil { + t.Fatalf("Failed to start SeaweedFS: %v", err) + } + env.weedProcess = cmd + + if !testutil.WaitForService(fmt.Sprintf("http://localhost:%d/status", env.s3Port), 30*time.Second) { + t.Fatalf("S3 API failed to become ready") + } +} + +func (env *TestEnvironment) StartPolaris(t *testing.T) { + t.Helper() + + containerName := fmt.Sprintf("seaweed-polaris-%d", time.Now().UnixNano()) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + cmd := exec.CommandContext(ctx, "docker", "run", "-d", "--rm", + "--name", containerName, + "--add-host", "host.docker.internal:host-gateway", + "-p", fmt.Sprintf("%d:8181", env.polarisPort), + "-p", fmt.Sprintf("%d:8182", env.polarisAdminPort), + "-e", fmt.Sprintf("AWS_REGION=%s", polarisRegion), + "-e", fmt.Sprintf("AWS_ACCESS_KEY_ID=%s", env.accessKey), + "-e", fmt.Sprintf("AWS_SECRET_ACCESS_KEY=%s", env.secretKey), + "-e", fmt.Sprintf("POLARIS_BOOTSTRAP_CREDENTIALS=%s,%s,%s", polarisRealm, polarisRootClientID, polarisRootClientSecret), + "-e", fmt.Sprintf("polaris.realm-context.realms=%s", polarisRealm), + "-e", "quarkus.otel.sdk.disabled=true", + polarisImage, + ) + + output, err := cmd.CombinedOutput() + if err != nil { + if ctx.Err() == context.DeadlineExceeded { + t.Fatalf("Timed out waiting for Polaris container: %v\nOutput:\n%s", ctx.Err(), string(output)) + } + t.Fatalf("Failed to start Polaris: %v\nOutput:\n%s", err, string(output)) + } + env.polarisContainer = containerName + + if !testutil.WaitForService(fmt.Sprintf("http://localhost:%d/q/health", env.polarisAdminPort), 60*time.Second) { + logCtx, logCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer logCancel() + logs, _ := exec.CommandContext(logCtx, "docker", "logs", env.polarisContainer).CombinedOutput() + t.Fatalf("Polaris failed to become ready\nLogs:\n%s", string(logs)) + } +} + +func (env *TestEnvironment) Cleanup(t *testing.T) { + t.Helper() + if env.weedCancel != nil { + env.weedCancel() + } + if env.weedProcess != nil { + time.Sleep(1 * time.Second) + _ = env.weedProcess.Wait() + } + if env.polarisContainer != "" { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _ = exec.CommandContext(ctx, "docker", "rm", "-f", env.polarisContainer).Run() + } + if env.dataDir != "" { + _ = os.RemoveAll(env.dataDir) + } +} + +func (env *TestEnvironment) polarisEndpoint() string { + return fmt.Sprintf("http://127.0.0.1:%d", env.polarisPort) +} + +func (env *TestEnvironment) s3Endpoint() string { + return fmt.Sprintf("http://127.0.0.1:%d", env.s3Port) +} + +func (env *TestEnvironment) s3InternalEndpoint() string { + return fmt.Sprintf("http://host.docker.internal:%d", env.s3Port) +} + +type polarisHTTPClient struct { + baseURL string + realm string + token string + httpClient *http.Client +} + +func newPolarisHTTPClient(baseURL, realm, token string) *polarisHTTPClient { + return &polarisHTTPClient{ + baseURL: baseURL, + realm: realm, + token: token, + httpClient: &http.Client{ + Timeout: 30 * time.Second, + }, + } +} + +func (c *polarisHTTPClient) doJSON(ctx context.Context, method, path string, body interface{}, out interface{}) error { + return c.doJSONWithHeaders(ctx, method, path, body, out, nil) +} + +func (c *polarisHTTPClient) doJSONWithHeaders(ctx context.Context, method, path string, body interface{}, out interface{}, headers map[string]string) error { + var reader io.Reader + if body != nil { + encoded, err := json.Marshal(body) + if err != nil { + return fmt.Errorf("encode request body: %w", err) + } + reader = bytes.NewReader(encoded) + } + + req, err := http.NewRequestWithContext(ctx, method, c.baseURL+path, reader) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + if body != nil { + req.Header.Set("Content-Type", "application/json") + } + req.Header.Set("Accept", "application/json") + if c.realm != "" { + req.Header.Set("Polaris-Realm", c.realm) + } + if c.token != "" { + req.Header.Set("Authorization", "Bearer "+c.token) + } + for key, value := range headers { + req.Header.Set(key, value) + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + bodyBytes, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return fmt.Errorf("request failed with status %d and could not read response body: %w", resp.StatusCode, readErr) + } + return fmt.Errorf("request failed with status %d: %s", resp.StatusCode, strings.TrimSpace(string(bodyBytes))) + } + if out == nil { + return nil + } + if err := json.NewDecoder(resp.Body).Decode(out); err != nil { + return fmt.Errorf("decode response: %w", err) + } + return nil +} + +func newS3Client(ctx context.Context, endpoint, region string, creds aws.Credentials, pathStyle bool) (*s3.Client, error) { + resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { + if service == s3.ServiceID { + return aws.Endpoint{ + URL: endpoint, + SigningRegion: region, + HostnameImmutable: true, + }, nil + } + return aws.Endpoint{}, &aws.EndpointNotFoundError{} + }) + + cfg, err := config.LoadDefaultConfig(ctx, + config.WithRegion(region), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(creds.AccessKeyID, creds.SecretAccessKey, creds.SessionToken)), + config.WithEndpointResolverWithOptions(resolver), + ) + if err != nil { + return nil, err + } + + return s3.NewFromConfig(cfg, func(o *s3.Options) { + o.UsePathStyle = pathStyle + }), nil +} diff --git a/test/s3tables/polaris/polaris_test.go b/test/s3tables/polaris/polaris_test.go new file mode 100644 index 000000000..db1fead63 --- /dev/null +++ b/test/s3tables/polaris/polaris_test.go @@ -0,0 +1,805 @@ +package polaris + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" +) + +type polarisSession struct { + catalogName string + bucketName string + token string + baseLocation string +} + +type polarisTableSetup struct { + namespace string + table string + dataKeyPrefix string + s3Client *s3.Client +} + +type polarisCatalogClient struct { + http *polarisHTTPClient + catalog string +} + +type polarisCredentials struct { + ClientID string `json:"clientId"` + ClientSecret string `json:"clientSecret"` +} + +type createPrincipalResponse struct { + Credentials polarisCredentials `json:"credentials"` +} + +type createCatalogRequest struct { + Catalog polarisCatalog `json:"catalog"` +} + +type polarisCatalog struct { + Name string `json:"name"` + Type string `json:"type"` + ReadOnly bool `json:"readOnly"` + Properties map[string]string `json:"properties"` + StorageConfigInfo polarisStorageConfig `json:"storageConfigInfo"` +} + +type polarisStorageConfig struct { + StorageType string `json:"storageType"` + AllowedLocations []string `json:"allowedLocations"` + Endpoint string `json:"endpoint"` + EndpointInternal string `json:"endpointInternal"` + StsEndpoint string `json:"stsEndpoint"` + PathStyleAccess bool `json:"pathStyleAccess"` + RoleArn string `json:"roleArn"` + Region string `json:"region"` +} + +type createNamespaceRequest struct { + Namespace []string `json:"namespace"` +} + +type createTableRequest struct { + Name string `json:"name"` + Location string `json:"location"` + Schema icebergSchema `json:"schema"` + PartitionSpec icebergPartition `json:"partition-spec"` + SortOrder icebergSortOrder `json:"sort-order"` + Properties map[string]string `json:"properties"` +} + +type icebergSchema struct { + Type string `json:"type"` + SchemaID int `json:"schema-id"` + Fields []icebergSchemaField `json:"fields"` +} + +type icebergSchemaField struct { + ID int `json:"id"` + Name string `json:"name"` + Type string `json:"type"` + Required bool `json:"required"` +} + +type icebergPartition struct { + SpecID int `json:"spec-id"` + Fields []icebergPartitionField `json:"fields"` +} + +type icebergPartitionField struct { + SourceID int `json:"source-id"` + FieldID int `json:"field-id"` + Name string `json:"name"` + Transform string `json:"transform"` +} + +type icebergSortOrder struct { + OrderID int `json:"order-id"` + Fields []icebergSortField `json:"fields"` +} + +type icebergSortField struct { + SourceID int `json:"source-id"` + Direction string `json:"direction"` + NullOrder string `json:"null-order"` +} + +type loadTableResponse struct { + Config map[string]string `json:"config"` + StorageCredentials []storageCredential `json:"storage-credentials"` +} + +type loadCredentialsResponse struct { + StorageCredentials []storageCredential `json:"storage-credentials"` +} + +type storageCredential struct { + Prefix string `json:"prefix"` + Config map[string]string `json:"config"` +} + +func bootstrapPolarisTest(t *testing.T, env *TestEnvironment) (context.Context, context.CancelFunc, polarisSession, *polarisTableSetup, func()) { + t.Helper() + + t.Logf(">>> Starting SeaweedFS with Polaris configuration...") + env.StartSeaweedFS(t) + t.Logf(">>> SeaweedFS started.") + + t.Logf(">>> Starting Polaris...") + env.StartPolaris(t) + t.Logf(">>> Polaris started.") + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + session := newPolarisSession(t, ctx, env) + setup, cleanup := setupPolarisTable(t, ctx, env, session) + + return ctx, cancel, session, setup, cleanup +} + +func TestPolarisIntegration(t *testing.T) { + env := NewTestEnvironment(t) + defer env.Cleanup(t) + + ctx, cancel, session, setup, cleanup := bootstrapPolarisTest(t, env) + defer cancel() + defer cleanup() + + objectKey := fmt.Sprintf("%s/hello-%d.txt", setup.dataKeyPrefix, time.Now().UnixNano()) + payload := []byte("polaris") + + if _, err := setup.s3Client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(session.bucketName), + Key: aws.String(objectKey), + Body: bytes.NewReader(payload), + }); err != nil { + t.Fatalf("PutObject failed: %v", err) + } + + listObjects, err := setup.s3Client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ + Bucket: aws.String(session.bucketName), + Prefix: aws.String(objectKey), + }) + if err != nil { + t.Fatalf("ListObjectsV2 failed: %v", err) + } + + found := false + for _, obj := range listObjects.Contents { + if aws.ToString(obj.Key) == objectKey { + found = true + break + } + } + if !found { + t.Fatalf("Object %s not found in list", objectKey) + } + + getResp, err := setup.s3Client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(session.bucketName), + Key: aws.String(objectKey), + }) + if err != nil { + t.Fatalf("GetObject failed: %v", err) + } + body, err := io.ReadAll(getResp.Body) + _ = getResp.Body.Close() + if err != nil { + t.Fatalf("Read object body failed: %v", err) + } + if !bytes.Equal(body, payload) { + t.Fatalf("Unexpected object payload: got %q want %q", string(body), string(payload)) + } + + if _, err := setup.s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(session.bucketName), + Key: aws.String(objectKey), + }); err != nil { + t.Fatalf("DeleteObject failed: %v", err) + } +} + +func TestPolarisTableIntegration(t *testing.T) { + env := NewTestEnvironment(t) + defer env.Cleanup(t) + + ctx, cancel, session, setup, cleanup := bootstrapPolarisTest(t, env) + defer cancel() + defer cleanup() + + objectKey := fmt.Sprintf("%s/part-%d.parquet", setup.dataKeyPrefix, time.Now().UnixNano()) + createResp, err := setup.s3Client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ + Bucket: aws.String(session.bucketName), + Key: aws.String(objectKey), + }) + if err != nil { + t.Fatalf("CreateMultipartUpload failed: %v", err) + } + + uploadID := aws.ToString(createResp.UploadId) + multipartCompleted := false + defer func() { + if uploadID == "" || multipartCompleted { + return + } + _, _ = setup.s3Client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ + Bucket: aws.String(session.bucketName), + Key: aws.String(objectKey), + UploadId: aws.String(uploadID), + }) + }() + + partSize := 5 * 1024 * 1024 + part1 := bytes.Repeat([]byte("a"), partSize) + part2 := bytes.Repeat([]byte("b"), 1024*1024) + + part1Resp, err := setup.s3Client.UploadPart(ctx, &s3.UploadPartInput{ + Bucket: aws.String(session.bucketName), + Key: aws.String(objectKey), + UploadId: aws.String(uploadID), + PartNumber: aws.Int32(1), + Body: bytes.NewReader(part1), + }) + if err != nil { + t.Fatalf("UploadPart 1 failed: %v", err) + } + + part2Resp, err := setup.s3Client.UploadPart(ctx, &s3.UploadPartInput{ + Bucket: aws.String(session.bucketName), + Key: aws.String(objectKey), + UploadId: aws.String(uploadID), + PartNumber: aws.Int32(2), + Body: bytes.NewReader(part2), + }) + if err != nil { + t.Fatalf("UploadPart 2 failed: %v", err) + } + + _, err = setup.s3Client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(session.bucketName), + Key: aws.String(objectKey), + UploadId: aws.String(uploadID), + MultipartUpload: &s3types.CompletedMultipartUpload{ + Parts: []s3types.CompletedPart{ + { + ETag: part1Resp.ETag, + PartNumber: aws.Int32(1), + }, + { + ETag: part2Resp.ETag, + PartNumber: aws.Int32(2), + }, + }, + }, + }) + if err != nil { + t.Fatalf("CompleteMultipartUpload failed: %v", err) + } + multipartCompleted = true + + headResp, err := setup.s3Client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(session.bucketName), + Key: aws.String(objectKey), + }) + if err != nil { + t.Fatalf("HeadObject after multipart upload failed: %v", err) + } + expectedSize := int64(len(part1) + len(part2)) + if headResp.ContentLength == nil || *headResp.ContentLength != expectedSize { + t.Fatalf("Unexpected content length: got %d want %d", aws.ToInt64(headResp.ContentLength), expectedSize) + } +} + +func newPolarisSession(t *testing.T, ctx context.Context, env *TestEnvironment) polarisSession { + t.Helper() + + adminCreds := aws.Credentials{ + AccessKeyID: env.accessKey, + SecretAccessKey: env.secretKey, + Source: "polaris-admin", + } + adminS3, err := newS3Client(ctx, env.s3Endpoint(), polarisRegion, adminCreds, true) + if err != nil { + t.Fatalf("Create admin S3 client failed: %v", err) + } + + bucketName := fmt.Sprintf("polaris-bucket-%d", time.Now().UnixNano()) + if _, err := adminS3.CreateBucket(ctx, &s3.CreateBucketInput{ + Bucket: aws.String(bucketName), + }); err != nil { + t.Fatalf("CreateBucket failed: %v", err) + } + policy := fmt.Sprintf(`{"Version":"2012-10-17","Statement":[{"Sid":"AllowPolarisVendedAccess","Effect":"Allow","Principal":"*","Action":"s3:*","Resource":["arn:aws:s3:::%s","arn:aws:s3:::%s/polaris/*"]}]}`, bucketName, bucketName) + if _, err := adminS3.PutBucketPolicy(ctx, &s3.PutBucketPolicyInput{ + Bucket: aws.String(bucketName), + Policy: aws.String(policy), + }); err != nil { + t.Fatalf("PutBucketPolicy failed: %v", err) + } + + rootToken, err := fetchPolarisToken(ctx, env.polarisEndpoint(), polarisRootClientID, polarisRootClientSecret) + if err != nil { + t.Fatalf("Polaris root token request failed: %v", err) + } + + managementClient := newPolarisHTTPClient(env.polarisEndpoint(), polarisRealm, rootToken) + catalogName := fmt.Sprintf("polaris_catalog_%d", time.Now().UnixNano()) + baseLocation := fmt.Sprintf("s3://%s/polaris", bucketName) + + catalogRequest := createCatalogRequest{ + Catalog: polarisCatalog{ + Name: catalogName, + Type: "INTERNAL", + ReadOnly: false, + Properties: map[string]string{ + "default-base-location": baseLocation, + }, + StorageConfigInfo: polarisStorageConfig{ + StorageType: "S3", + AllowedLocations: []string{baseLocation}, + Endpoint: env.s3Endpoint(), + EndpointInternal: env.s3InternalEndpoint(), + StsEndpoint: env.s3InternalEndpoint(), + PathStyleAccess: true, + RoleArn: polarisRoleArn, + Region: polarisRegion, + }, + }, + } + + if err := managementClient.doJSON(ctx, http.MethodPost, "/api/management/v1/catalogs", catalogRequest, nil); err != nil { + t.Fatalf("Create catalog failed: %v", err) + } + + principalName := fmt.Sprintf("polaris_user_%d", time.Now().UnixNano()) + principalRoleName := fmt.Sprintf("polaris_principal_role_%d", time.Now().UnixNano()) + catalogRoleName := fmt.Sprintf("polaris_catalog_role_%d", time.Now().UnixNano()) + + var principalResp createPrincipalResponse + if err := managementClient.doJSON(ctx, http.MethodPost, "/api/management/v1/principals", map[string]interface{}{ + "principal": map[string]interface{}{ + "name": principalName, + "properties": map[string]string{}, + }, + }, &principalResp); err != nil { + t.Fatalf("Create principal failed: %v", err) + } + if principalResp.Credentials.ClientID == "" || principalResp.Credentials.ClientSecret == "" { + t.Fatalf("Missing principal credentials in response") + } + + if err := managementClient.doJSON(ctx, http.MethodPost, "/api/management/v1/principal-roles", map[string]interface{}{ + "principalRole": map[string]interface{}{ + "name": principalRoleName, + "properties": map[string]string{}, + }, + }, nil); err != nil { + t.Fatalf("Create principal role failed: %v", err) + } + + if err := managementClient.doJSON(ctx, http.MethodPost, fmt.Sprintf("/api/management/v1/catalogs/%s/catalog-roles", url.PathEscape(catalogName)), map[string]interface{}{ + "catalogRole": map[string]interface{}{ + "name": catalogRoleName, + "properties": map[string]string{}, + }, + }, nil); err != nil { + t.Fatalf("Create catalog role failed: %v", err) + } + + if err := managementClient.doJSON(ctx, http.MethodPut, fmt.Sprintf("/api/management/v1/principals/%s/principal-roles", url.PathEscape(principalName)), map[string]interface{}{ + "principalRole": map[string]interface{}{ + "name": principalRoleName, + }, + }, nil); err != nil { + t.Fatalf("Assign principal role failed: %v", err) + } + + if err := managementClient.doJSON(ctx, http.MethodPut, fmt.Sprintf("/api/management/v1/principal-roles/%s/catalog-roles/%s", url.PathEscape(principalRoleName), url.PathEscape(catalogName)), map[string]interface{}{ + "catalogRole": map[string]interface{}{ + "name": catalogRoleName, + }, + }, nil); err != nil { + t.Fatalf("Assign catalog role failed: %v", err) + } + + if err := managementClient.doJSON(ctx, http.MethodPut, fmt.Sprintf("/api/management/v1/catalogs/%s/catalog-roles/%s/grants", url.PathEscape(catalogName), url.PathEscape(catalogRoleName)), map[string]interface{}{ + "type": "catalog", + "privilege": "CATALOG_MANAGE_CONTENT", + }, nil); err != nil { + t.Fatalf("Grant catalog privilege failed: %v", err) + } + + userToken, err := fetchPolarisToken(ctx, env.polarisEndpoint(), principalResp.Credentials.ClientID, principalResp.Credentials.ClientSecret) + if err != nil { + t.Fatalf("Polaris user token request failed: %v", err) + } + + return polarisSession{ + catalogName: catalogName, + bucketName: bucketName, + token: userToken, + baseLocation: baseLocation, + } +} + +func setupPolarisTable(t *testing.T, ctx context.Context, env *TestEnvironment, session polarisSession) (*polarisTableSetup, func()) { + t.Helper() + + catalogClient := newPolarisCatalogClient(env.polarisEndpoint(), polarisRealm, session.token, session.catalogName) + namespace := fmt.Sprintf("polaris_ns_%d", time.Now().UnixNano()) + table := fmt.Sprintf("polaris_table_%d", time.Now().UnixNano()) + + if err := catalogClient.CreateNamespace(ctx, namespace); err != nil { + t.Fatalf("CreateNamespace failed: %v", err) + } + + location := fmt.Sprintf("%s/%s/%s", session.baseLocation, namespace, table) + if err := catalogClient.CreateTable(ctx, namespace, table, location); err != nil { + t.Fatalf("CreateTable failed: %v", err) + } + + loadResp, err := catalogClient.LoadTable(ctx, namespace, table) + if err != nil { + t.Fatalf("LoadTable failed: %v", err) + } + + credsResp, err := catalogClient.LoadCredentials(ctx, namespace, table) + if err != nil { + t.Fatalf("LoadCredentials failed: %v", err) + } + if len(credsResp.StorageCredentials) == 0 { + t.Fatalf("LoadCredentials returned no storage credentials") + } + + credentialSource := &loadTableResponse{ + Config: loadResp.Config, + StorageCredentials: credsResp.StorageCredentials, + } + + dataPrefix := location + "/data" + creds, endpoint, region, pathStyle, err := extractS3Credentials(credentialSource, dataPrefix, env.s3Endpoint(), polarisRegion) + if err != nil { + t.Fatalf("Extract vended credentials failed: %v", err) + } + dataKeyPrefix, err := s3URIToKeyPrefix(dataPrefix, session.bucketName) + if err != nil { + t.Fatalf("Invalid data prefix %s: %v", dataPrefix, err) + } + + s3Client, err := newS3Client(ctx, endpoint, region, creds, pathStyle) + if err != nil { + t.Fatalf("Create vended S3 client failed: %v", err) + } + + cleanup := func() { + cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cleanupCancel() + if err := catalogClient.DeleteTable(cleanupCtx, namespace, table); err != nil { + t.Logf("DeleteTable failed: %v", err) + } + if err := catalogClient.DeleteNamespace(cleanupCtx, namespace); err != nil { + t.Logf("DeleteNamespace failed: %v", err) + } + } + + return &polarisTableSetup{ + namespace: namespace, + table: table, + dataKeyPrefix: dataKeyPrefix, + s3Client: s3Client, + }, cleanup +} + +func fetchPolarisToken(ctx context.Context, baseURL, clientID, clientSecret string) (string, error) { + form := url.Values{} + form.Set("grant_type", "client_credentials") + form.Set("client_id", clientID) + form.Set("client_secret", clientSecret) + form.Set("scope", "PRINCIPAL_ROLE:ALL") + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, baseURL+"/api/catalog/v1/oauth/tokens", strings.NewReader(form.Encode())) + if err != nil { + return "", fmt.Errorf("create token request: %w", err) + } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + req.Header.Set("Accept", "application/json") + + client := &http.Client{Timeout: 30 * time.Second} + resp, err := client.Do(req) + if err != nil { + return "", fmt.Errorf("token request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return "", fmt.Errorf("token request failed with status %d and reading body: %w", resp.StatusCode, readErr) + } + return "", fmt.Errorf("token request failed with status %d: %s", resp.StatusCode, strings.TrimSpace(string(body))) + } + + var tokenResp struct { + AccessToken string `json:"access_token"` + } + if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil { + return "", fmt.Errorf("decode token response: %w", err) + } + if tokenResp.AccessToken == "" { + return "", fmt.Errorf("missing access token in response") + } + + return tokenResp.AccessToken, nil +} + +func newPolarisCatalogClient(baseURL, realm, token, catalog string) *polarisCatalogClient { + return &polarisCatalogClient{ + http: newPolarisHTTPClient(baseURL, realm, token), + catalog: catalog, + } +} + +func (c *polarisCatalogClient) CreateNamespace(ctx context.Context, namespace string) error { + path := fmt.Sprintf("/api/catalog/v1/%s/namespaces", url.PathEscape(c.catalog)) + req := createNamespaceRequest{Namespace: []string{namespace}} + return c.http.doJSON(ctx, http.MethodPost, path, req, nil) +} + +func (c *polarisCatalogClient) DeleteNamespace(ctx context.Context, namespace string) error { + path := fmt.Sprintf("/api/catalog/v1/%s/namespaces/%s", url.PathEscape(c.catalog), url.PathEscape(namespace)) + return c.http.doJSON(ctx, http.MethodDelete, path, nil, nil) +} + +func (c *polarisCatalogClient) CreateTable(ctx context.Context, namespace, table, location string) error { + path := fmt.Sprintf("/api/catalog/v1/%s/namespaces/%s/tables", url.PathEscape(c.catalog), url.PathEscape(namespace)) + + req := createTableRequest{ + Name: table, + Location: location, + Schema: icebergSchema{ + Type: "struct", + SchemaID: 0, + Fields: []icebergSchemaField{ + { + ID: 1, + Name: "id", + Type: "long", + Required: false, + }, + }, + }, + PartitionSpec: icebergPartition{ + SpecID: 0, + Fields: []icebergPartitionField{}, + }, + SortOrder: icebergSortOrder{ + OrderID: 0, + Fields: []icebergSortField{}, + }, + Properties: map[string]string{ + "format-version": "2", + }, + } + + return c.http.doJSON(ctx, http.MethodPost, path, req, nil) +} + +func (c *polarisCatalogClient) DeleteTable(ctx context.Context, namespace, table string) error { + path := fmt.Sprintf("/api/catalog/v1/%s/namespaces/%s/tables/%s", url.PathEscape(c.catalog), url.PathEscape(namespace), url.PathEscape(table)) + return c.http.doJSON(ctx, http.MethodDelete, path, nil, nil) +} + +func (c *polarisCatalogClient) LoadTable(ctx context.Context, namespace, table string) (*loadTableResponse, error) { + path := fmt.Sprintf("/api/catalog/v1/%s/namespaces/%s/tables/%s", url.PathEscape(c.catalog), url.PathEscape(namespace), url.PathEscape(table)) + var resp loadTableResponse + headers := map[string]string{ + "X-Iceberg-Access-Delegation": "vended-credentials", + } + if err := c.http.doJSONWithHeaders(ctx, http.MethodGet, path, nil, &resp, headers); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *polarisCatalogClient) LoadCredentials(ctx context.Context, namespace, table string) (*loadCredentialsResponse, error) { + path := fmt.Sprintf("/api/catalog/v1/%s/namespaces/%s/tables/%s/credentials", url.PathEscape(c.catalog), url.PathEscape(namespace), url.PathEscape(table)) + var resp loadCredentialsResponse + if err := c.http.doJSON(ctx, http.MethodGet, path, nil, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func extractS3Credentials(load *loadTableResponse, targetPrefix, fallbackEndpoint, fallbackRegion string) (aws.Credentials, string, string, bool, error) { + configMap, err := selectStorageConfig(load, targetPrefix) + if err != nil { + return aws.Credentials{}, "", "", false, err + } + + lookup := func(keys ...string) string { + for _, key := range keys { + if val, ok := configMap[key]; ok && val != "" { + return val + } + } + return "" + } + + accessKey := lookup("s3.access-key-id", "aws.access-key-id", "s3.accessKeyId", "aws.accessKeyId", "accessKeyId", "access_key_id") + secretKey := lookup("s3.secret-access-key", "aws.secret-access-key", "s3.secretAccessKey", "aws.secretAccessKey", "secretAccessKey", "secret_access_key") + sessionToken := lookup("s3.session-token", "aws.session-token", "s3.sessionToken", "aws.sessionToken", "sessionToken", "session_token") + if accessKey == "" || secretKey == "" { + if altConfig := findConfigWithKeys(load, targetPrefix); altConfig != nil { + configMap = altConfig + accessKey = lookup("s3.access-key-id", "aws.access-key-id", "s3.accessKeyId", "aws.accessKeyId", "accessKeyId", "access_key_id") + secretKey = lookup("s3.secret-access-key", "aws.secret-access-key", "s3.secretAccessKey", "aws.secretAccessKey", "secretAccessKey", "secret_access_key") + sessionToken = lookup("s3.session-token", "aws.session-token", "s3.sessionToken", "aws.sessionToken", "sessionToken", "session_token") + } + } + if accessKey == "" || secretKey == "" { + return aws.Credentials{}, "", "", false, fmt.Errorf("missing access key or secret in storage credentials") + } + + endpoint := lookup("s3.endpoint", "s3.endpoint-url", "aws.endpoint") + if endpoint == "" { + endpoint = fallbackEndpoint + } + if endpoint != "" && !strings.HasPrefix(endpoint, "http") { + endpoint = "http://" + endpoint + } + + region := lookup("s3.region", "aws.region") + if region == "" { + region = fallbackRegion + } + + pathStyle := true + if value := lookup("s3.path-style-access", "s3.pathStyleAccess", "path-style-access"); value != "" { + pathStyle = strings.EqualFold(value, "true") + } + + return aws.Credentials{ + AccessKeyID: accessKey, + SecretAccessKey: secretKey, + SessionToken: sessionToken, + Source: "polaris-vended", + }, endpoint, region, pathStyle, nil +} + +func selectStorageConfig(load *loadTableResponse, targetPrefix string) (map[string]string, error) { + switch len(load.StorageCredentials) { + case 0: + if load.Config == nil { + return nil, fmt.Errorf("polaris returned no storage credentials or config") + } + return load.Config, nil + case 1: + cred := load.StorageCredentials[0] + if cred.Config == nil { + return nil, fmt.Errorf("storage credential for prefix %s returned nil config", cred.Prefix) + } + return cred.Config, nil + default: + if targetPrefix == "" { + return nil, fmt.Errorf("multiple storage credentials (%d) returned but no target prefix provided", len(load.StorageCredentials)) + } + normalizedTarget := normalizePrefix(targetPrefix) + if normalizedTarget == "" { + return nil, fmt.Errorf("target prefix %q normalized to empty string", targetPrefix) + } + var bestConfig map[string]string + bestLen := -1 + for _, cred := range load.StorageCredentials { + if cred.Config == nil { + continue + } + prefix := normalizePrefix(cred.Prefix) + if prefix == "" { + if bestLen < 0 { + bestLen = 0 + bestConfig = cred.Config + } + continue + } + if normalizedTarget == prefix || strings.HasPrefix(normalizedTarget, prefix+"/") { + if len(prefix) > bestLen { + bestLen = len(prefix) + bestConfig = cred.Config + } + } + } + if bestConfig != nil { + return bestConfig, nil + } + return nil, fmt.Errorf("none of the %d storage credentials matched prefix %s", len(load.StorageCredentials), targetPrefix) + } +} + +func normalizePrefix(prefix string) string { + p := strings.TrimSpace(prefix) + p = strings.TrimSuffix(p, "/") + return p +} + +func findConfigWithKeys(load *loadTableResponse, targetPrefix string) map[string]string { + normalizedTarget := normalizePrefix(targetPrefix) + if normalizedTarget == "" { + return nil + } + + hasKeys := func(config map[string]string) bool { + if config == nil { + return false + } + accessKey := lookupValue(config, "s3.access-key-id", "aws.access-key-id", "s3.accessKeyId", "aws.accessKeyId", "accessKeyId", "access_key_id") + secretKey := lookupValue(config, "s3.secret-access-key", "aws.secret-access-key", "s3.secretAccessKey", "aws.secretAccessKey", "secretAccessKey", "secret_access_key") + return accessKey != "" && secretKey != "" + } + + bestConfig := map[string]string(nil) + bestLen := -1 + for _, cred := range load.StorageCredentials { + if !hasKeys(cred.Config) { + continue + } + prefix := normalizePrefix(cred.Prefix) + if prefix == "" { + if bestLen < 0 { + bestLen = 0 + bestConfig = cred.Config + } + continue + } + if normalizedTarget == prefix || strings.HasPrefix(normalizedTarget, prefix+"/") { + if len(prefix) > bestLen { + bestLen = len(prefix) + bestConfig = cred.Config + } + } + } + if bestConfig != nil { + return bestConfig + } + + if len(load.StorageCredentials) == 0 && hasKeys(load.Config) { + return load.Config + } + if hasKeys(load.Config) { + return load.Config + } + return nil +} + +func lookupValue(config map[string]string, keys ...string) string { + for _, key := range keys { + if val, ok := config[key]; ok && val != "" { + return val + } + } + return "" +} + +func s3URIToKeyPrefix(uri, bucket string) (string, error) { + prefix := "s3://" + bucket + "/" + if !strings.HasPrefix(uri, prefix) { + return "", fmt.Errorf("uri %q does not match bucket %q", uri, bucket) + } + keyPrefix := strings.TrimPrefix(uri, prefix) + keyPrefix = strings.TrimPrefix(keyPrefix, "/") + if keyPrefix == "" { + return "", fmt.Errorf("empty key prefix in uri %q", uri) + } + return keyPrefix, nil +}