Browse Source
Add Apache Polaris integration tests (#8478)
Add Apache Polaris integration tests (#8478)
* test: add polaris integration test harness * test: add polaris integration coverage * ci: run polaris s3 tables tests * test: harden polaris harness * test: DRY polaris integration tests * ci: pre-pull Polaris image * test: extend Polaris pull timeout * test: refine polaris credentials selection * test: keep Polaris tables inside allowed location * test: use fresh context for polaris cleanup * test: prefer specific Polaris storage credential * test: tolerate Polaris credential variants * test: request Polaris vended credentials * test: load Polaris table credentials * test: allow polaris vended access via bucket policy * test: align Polaris object keys with table locationpull/8305/merge
committed by
GitHub
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 1243 additions and 0 deletions
-
61.github/workflows/s3-tables-tests.yml
-
377test/s3tables/polaris/polaris_env_test.go
-
805test/s3tables/polaris/polaris_test.go
@ -0,0 +1,377 @@ |
|||
package polaris |
|||
|
|||
import ( |
|||
"bytes" |
|||
"context" |
|||
"encoding/json" |
|||
"fmt" |
|||
"io" |
|||
"net/http" |
|||
"os" |
|||
"os/exec" |
|||
"path/filepath" |
|||
"strings" |
|||
"testing" |
|||
"time" |
|||
|
|||
"github.com/aws/aws-sdk-go-v2/aws" |
|||
"github.com/aws/aws-sdk-go-v2/config" |
|||
"github.com/aws/aws-sdk-go-v2/credentials" |
|||
"github.com/aws/aws-sdk-go-v2/service/s3" |
|||
|
|||
"github.com/seaweedfs/seaweedfs/test/s3tables/testutil" |
|||
) |
|||
|
|||
const ( |
|||
polarisImage = "apache/polaris:latest" |
|||
polarisRealm = "POLARIS" |
|||
polarisRootClientID = "root" |
|||
polarisRootClientSecret = "s3cr3t" |
|||
polarisRegion = "us-east-1" |
|||
polarisRoleArn = "arn:aws:iam::000000000000:role/LakekeeperVendedRole" |
|||
polarisSigningKey = "dGVzdC1zaWduaW5nLWtleS1mb3Itc3RzLWludGVncmF0aW9uLXRlc3Rz" // gitleaks:allow - test signing key
|
|||
) |
|||
|
|||
type TestEnvironment struct { |
|||
seaweedDir string |
|||
weedBinary string |
|||
dataDir string |
|||
bindIP string |
|||
s3Port int |
|||
s3GrpcPort int |
|||
masterPort int |
|||
masterGrpcPort int |
|||
filerPort int |
|||
filerGrpcPort int |
|||
volumePort int |
|||
volumeGrpcPort int |
|||
polarisPort int |
|||
polarisAdminPort int |
|||
weedProcess *exec.Cmd |
|||
weedCancel context.CancelFunc |
|||
polarisContainer string |
|||
accessKey string |
|||
secretKey string |
|||
} |
|||
|
|||
func NewTestEnvironment(t *testing.T) *TestEnvironment { |
|||
t.Helper() |
|||
|
|||
if !testutil.HasDocker() { |
|||
t.Skip("Docker is required for Polaris integration tests") |
|||
} |
|||
|
|||
wd, err := os.Getwd() |
|||
if err != nil { |
|||
t.Fatalf("Failed to get working directory: %v", err) |
|||
} |
|||
|
|||
seaweedDir := wd |
|||
for i := 0; i < 6; i++ { |
|||
if _, err := os.Stat(filepath.Join(seaweedDir, "go.mod")); err == nil { |
|||
break |
|||
} |
|||
seaweedDir = filepath.Dir(seaweedDir) |
|||
} |
|||
|
|||
weedBinary := filepath.Join(seaweedDir, "weed", "weed") |
|||
if _, err := os.Stat(weedBinary); err != nil { |
|||
weedBinary = "weed" |
|||
if _, err := exec.LookPath(weedBinary); err != nil { |
|||
t.Skip("weed binary not found, skipping integration test") |
|||
} |
|||
} |
|||
|
|||
dataDir, err := os.MkdirTemp("", "seaweed-polaris-test-*") |
|||
if err != nil { |
|||
t.Fatalf("Failed to create temp dir: %v", err) |
|||
} |
|||
|
|||
bindIP := testutil.FindBindIP() |
|||
|
|||
masterPort, masterGrpcPort := testutil.MustFreePortPair(t, "Master") |
|||
volumePort, volumeGrpcPort := testutil.MustFreePortPair(t, "Volume") |
|||
filerPort, filerGrpcPort := testutil.MustFreePortPair(t, "Filer") |
|||
s3Port, s3GrpcPort := testutil.MustFreePortPair(t, "S3") |
|||
polarisPort, polarisAdminPort := testutil.MustFreePortPair(t, "Polaris") |
|||
|
|||
return &TestEnvironment{ |
|||
seaweedDir: seaweedDir, |
|||
weedBinary: weedBinary, |
|||
dataDir: dataDir, |
|||
bindIP: bindIP, |
|||
s3Port: s3Port, |
|||
s3GrpcPort: s3GrpcPort, |
|||
masterPort: masterPort, |
|||
masterGrpcPort: masterGrpcPort, |
|||
filerPort: filerPort, |
|||
filerGrpcPort: filerGrpcPort, |
|||
volumePort: volumePort, |
|||
volumeGrpcPort: volumeGrpcPort, |
|||
polarisPort: polarisPort, |
|||
polarisAdminPort: polarisAdminPort, |
|||
accessKey: "admin", |
|||
secretKey: "admin", |
|||
} |
|||
} |
|||
|
|||
func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { |
|||
t.Helper() |
|||
|
|||
iamConfigPath := filepath.Join(env.dataDir, "iam.json") |
|||
iamConfig := fmt.Sprintf(`{ |
|||
"identities": [ |
|||
{ |
|||
"name": "admin", |
|||
"credentials": [ |
|||
{ |
|||
"accessKey": "%s", |
|||
"secretKey": "%s" |
|||
} |
|||
], |
|||
"actions": ["Admin", "Read", "List", "Tagging", "Write"] |
|||
} |
|||
], |
|||
"sts": { |
|||
"tokenDuration": "12h", |
|||
"maxSessionLength": "24h", |
|||
"issuer": "seaweedfs-sts", |
|||
"signingKey": "%s" |
|||
}, |
|||
"roles": [ |
|||
{ |
|||
"roleName": "LakekeeperVendedRole", |
|||
"roleArn": "%s", |
|||
"trustPolicy": { |
|||
"Version": "2012-10-17", |
|||
"Statement": [ |
|||
{ |
|||
"Effect": "Allow", |
|||
"Principal": "*", |
|||
"Action": "sts:AssumeRole" |
|||
} |
|||
] |
|||
}, |
|||
"attachedPolicies": ["FullAccess"] |
|||
} |
|||
], |
|||
"policies": [ |
|||
{ |
|||
"name": "FullAccess", |
|||
"document": { |
|||
"Version": "2012-10-17", |
|||
"Statement": [ |
|||
{ |
|||
"Effect": "Allow", |
|||
"Action": "*", |
|||
"Resource": "*" |
|||
} |
|||
] |
|||
} |
|||
} |
|||
] |
|||
}`, env.accessKey, env.secretKey, polarisSigningKey, polarisRoleArn) |
|||
|
|||
if err := os.WriteFile(iamConfigPath, []byte(iamConfig), 0644); err != nil { |
|||
t.Fatalf("Failed to create IAM config: %v", err) |
|||
} |
|||
|
|||
ctx, cancel := context.WithCancel(context.Background()) |
|||
env.weedCancel = cancel |
|||
|
|||
cmd := exec.CommandContext(ctx, env.weedBinary, "-v", "4", "mini", |
|||
"-master.port", fmt.Sprintf("%d", env.masterPort), |
|||
"-master.port.grpc", fmt.Sprintf("%d", env.masterGrpcPort), |
|||
"-volume.port", fmt.Sprintf("%d", env.volumePort), |
|||
"-volume.port.grpc", fmt.Sprintf("%d", env.volumeGrpcPort), |
|||
"-filer.port", fmt.Sprintf("%d", env.filerPort), |
|||
"-filer.port.grpc", fmt.Sprintf("%d", env.filerGrpcPort), |
|||
"-s3.port", fmt.Sprintf("%d", env.s3Port), |
|||
"-s3.port.grpc", fmt.Sprintf("%d", env.s3GrpcPort), |
|||
"-s3.config", iamConfigPath, |
|||
"-s3.iam.config", iamConfigPath, |
|||
"-s3.iam.readOnly=false", |
|||
"-ip", env.bindIP, |
|||
"-ip.bind", "0.0.0.0", |
|||
"-dir", env.dataDir, |
|||
) |
|||
cmd.Dir = env.dataDir |
|||
cmd.Stdout = os.Stdout |
|||
cmd.Stderr = os.Stderr |
|||
|
|||
if err := cmd.Start(); err != nil { |
|||
t.Fatalf("Failed to start SeaweedFS: %v", err) |
|||
} |
|||
env.weedProcess = cmd |
|||
|
|||
if !testutil.WaitForService(fmt.Sprintf("http://localhost:%d/status", env.s3Port), 30*time.Second) { |
|||
t.Fatalf("S3 API failed to become ready") |
|||
} |
|||
} |
|||
|
|||
func (env *TestEnvironment) StartPolaris(t *testing.T) { |
|||
t.Helper() |
|||
|
|||
containerName := fmt.Sprintf("seaweed-polaris-%d", time.Now().UnixNano()) |
|||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) |
|||
defer cancel() |
|||
|
|||
cmd := exec.CommandContext(ctx, "docker", "run", "-d", "--rm", |
|||
"--name", containerName, |
|||
"--add-host", "host.docker.internal:host-gateway", |
|||
"-p", fmt.Sprintf("%d:8181", env.polarisPort), |
|||
"-p", fmt.Sprintf("%d:8182", env.polarisAdminPort), |
|||
"-e", fmt.Sprintf("AWS_REGION=%s", polarisRegion), |
|||
"-e", fmt.Sprintf("AWS_ACCESS_KEY_ID=%s", env.accessKey), |
|||
"-e", fmt.Sprintf("AWS_SECRET_ACCESS_KEY=%s", env.secretKey), |
|||
"-e", fmt.Sprintf("POLARIS_BOOTSTRAP_CREDENTIALS=%s,%s,%s", polarisRealm, polarisRootClientID, polarisRootClientSecret), |
|||
"-e", fmt.Sprintf("polaris.realm-context.realms=%s", polarisRealm), |
|||
"-e", "quarkus.otel.sdk.disabled=true", |
|||
polarisImage, |
|||
) |
|||
|
|||
output, err := cmd.CombinedOutput() |
|||
if err != nil { |
|||
if ctx.Err() == context.DeadlineExceeded { |
|||
t.Fatalf("Timed out waiting for Polaris container: %v\nOutput:\n%s", ctx.Err(), string(output)) |
|||
} |
|||
t.Fatalf("Failed to start Polaris: %v\nOutput:\n%s", err, string(output)) |
|||
} |
|||
env.polarisContainer = containerName |
|||
|
|||
if !testutil.WaitForService(fmt.Sprintf("http://localhost:%d/q/health", env.polarisAdminPort), 60*time.Second) { |
|||
logCtx, logCancel := context.WithTimeout(context.Background(), 10*time.Second) |
|||
defer logCancel() |
|||
logs, _ := exec.CommandContext(logCtx, "docker", "logs", env.polarisContainer).CombinedOutput() |
|||
t.Fatalf("Polaris failed to become ready\nLogs:\n%s", string(logs)) |
|||
} |
|||
} |
|||
|
|||
func (env *TestEnvironment) Cleanup(t *testing.T) { |
|||
t.Helper() |
|||
if env.weedCancel != nil { |
|||
env.weedCancel() |
|||
} |
|||
if env.weedProcess != nil { |
|||
time.Sleep(1 * time.Second) |
|||
_ = env.weedProcess.Wait() |
|||
} |
|||
if env.polarisContainer != "" { |
|||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) |
|||
defer cancel() |
|||
_ = exec.CommandContext(ctx, "docker", "rm", "-f", env.polarisContainer).Run() |
|||
} |
|||
if env.dataDir != "" { |
|||
_ = os.RemoveAll(env.dataDir) |
|||
} |
|||
} |
|||
|
|||
func (env *TestEnvironment) polarisEndpoint() string { |
|||
return fmt.Sprintf("http://127.0.0.1:%d", env.polarisPort) |
|||
} |
|||
|
|||
func (env *TestEnvironment) s3Endpoint() string { |
|||
return fmt.Sprintf("http://127.0.0.1:%d", env.s3Port) |
|||
} |
|||
|
|||
func (env *TestEnvironment) s3InternalEndpoint() string { |
|||
return fmt.Sprintf("http://host.docker.internal:%d", env.s3Port) |
|||
} |
|||
|
|||
type polarisHTTPClient struct { |
|||
baseURL string |
|||
realm string |
|||
token string |
|||
httpClient *http.Client |
|||
} |
|||
|
|||
func newPolarisHTTPClient(baseURL, realm, token string) *polarisHTTPClient { |
|||
return &polarisHTTPClient{ |
|||
baseURL: baseURL, |
|||
realm: realm, |
|||
token: token, |
|||
httpClient: &http.Client{ |
|||
Timeout: 30 * time.Second, |
|||
}, |
|||
} |
|||
} |
|||
|
|||
func (c *polarisHTTPClient) doJSON(ctx context.Context, method, path string, body interface{}, out interface{}) error { |
|||
return c.doJSONWithHeaders(ctx, method, path, body, out, nil) |
|||
} |
|||
|
|||
func (c *polarisHTTPClient) doJSONWithHeaders(ctx context.Context, method, path string, body interface{}, out interface{}, headers map[string]string) error { |
|||
var reader io.Reader |
|||
if body != nil { |
|||
encoded, err := json.Marshal(body) |
|||
if err != nil { |
|||
return fmt.Errorf("encode request body: %w", err) |
|||
} |
|||
reader = bytes.NewReader(encoded) |
|||
} |
|||
|
|||
req, err := http.NewRequestWithContext(ctx, method, c.baseURL+path, reader) |
|||
if err != nil { |
|||
return fmt.Errorf("create request: %w", err) |
|||
} |
|||
if body != nil { |
|||
req.Header.Set("Content-Type", "application/json") |
|||
} |
|||
req.Header.Set("Accept", "application/json") |
|||
if c.realm != "" { |
|||
req.Header.Set("Polaris-Realm", c.realm) |
|||
} |
|||
if c.token != "" { |
|||
req.Header.Set("Authorization", "Bearer "+c.token) |
|||
} |
|||
for key, value := range headers { |
|||
req.Header.Set(key, value) |
|||
} |
|||
|
|||
resp, err := c.httpClient.Do(req) |
|||
if err != nil { |
|||
return fmt.Errorf("request failed: %w", err) |
|||
} |
|||
defer resp.Body.Close() |
|||
|
|||
if resp.StatusCode < 200 || resp.StatusCode >= 300 { |
|||
bodyBytes, readErr := io.ReadAll(resp.Body) |
|||
if readErr != nil { |
|||
return fmt.Errorf("request failed with status %d and could not read response body: %w", resp.StatusCode, readErr) |
|||
} |
|||
return fmt.Errorf("request failed with status %d: %s", resp.StatusCode, strings.TrimSpace(string(bodyBytes))) |
|||
} |
|||
if out == nil { |
|||
return nil |
|||
} |
|||
if err := json.NewDecoder(resp.Body).Decode(out); err != nil { |
|||
return fmt.Errorf("decode response: %w", err) |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func newS3Client(ctx context.Context, endpoint, region string, creds aws.Credentials, pathStyle bool) (*s3.Client, error) { |
|||
resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { |
|||
if service == s3.ServiceID { |
|||
return aws.Endpoint{ |
|||
URL: endpoint, |
|||
SigningRegion: region, |
|||
HostnameImmutable: true, |
|||
}, nil |
|||
} |
|||
return aws.Endpoint{}, &aws.EndpointNotFoundError{} |
|||
}) |
|||
|
|||
cfg, err := config.LoadDefaultConfig(ctx, |
|||
config.WithRegion(region), |
|||
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(creds.AccessKeyID, creds.SecretAccessKey, creds.SessionToken)), |
|||
config.WithEndpointResolverWithOptions(resolver), |
|||
) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
return s3.NewFromConfig(cfg, func(o *s3.Options) { |
|||
o.UsePathStyle = pathStyle |
|||
}), nil |
|||
} |
|||
@ -0,0 +1,805 @@ |
|||
package polaris |
|||
|
|||
import ( |
|||
"bytes" |
|||
"context" |
|||
"encoding/json" |
|||
"fmt" |
|||
"io" |
|||
"net/http" |
|||
"net/url" |
|||
"strings" |
|||
"testing" |
|||
"time" |
|||
|
|||
"github.com/aws/aws-sdk-go-v2/aws" |
|||
"github.com/aws/aws-sdk-go-v2/service/s3" |
|||
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" |
|||
) |
|||
|
|||
type polarisSession struct { |
|||
catalogName string |
|||
bucketName string |
|||
token string |
|||
baseLocation string |
|||
} |
|||
|
|||
type polarisTableSetup struct { |
|||
namespace string |
|||
table string |
|||
dataKeyPrefix string |
|||
s3Client *s3.Client |
|||
} |
|||
|
|||
type polarisCatalogClient struct { |
|||
http *polarisHTTPClient |
|||
catalog string |
|||
} |
|||
|
|||
type polarisCredentials struct { |
|||
ClientID string `json:"clientId"` |
|||
ClientSecret string `json:"clientSecret"` |
|||
} |
|||
|
|||
type createPrincipalResponse struct { |
|||
Credentials polarisCredentials `json:"credentials"` |
|||
} |
|||
|
|||
type createCatalogRequest struct { |
|||
Catalog polarisCatalog `json:"catalog"` |
|||
} |
|||
|
|||
type polarisCatalog struct { |
|||
Name string `json:"name"` |
|||
Type string `json:"type"` |
|||
ReadOnly bool `json:"readOnly"` |
|||
Properties map[string]string `json:"properties"` |
|||
StorageConfigInfo polarisStorageConfig `json:"storageConfigInfo"` |
|||
} |
|||
|
|||
type polarisStorageConfig struct { |
|||
StorageType string `json:"storageType"` |
|||
AllowedLocations []string `json:"allowedLocations"` |
|||
Endpoint string `json:"endpoint"` |
|||
EndpointInternal string `json:"endpointInternal"` |
|||
StsEndpoint string `json:"stsEndpoint"` |
|||
PathStyleAccess bool `json:"pathStyleAccess"` |
|||
RoleArn string `json:"roleArn"` |
|||
Region string `json:"region"` |
|||
} |
|||
|
|||
type createNamespaceRequest struct { |
|||
Namespace []string `json:"namespace"` |
|||
} |
|||
|
|||
type createTableRequest struct { |
|||
Name string `json:"name"` |
|||
Location string `json:"location"` |
|||
Schema icebergSchema `json:"schema"` |
|||
PartitionSpec icebergPartition `json:"partition-spec"` |
|||
SortOrder icebergSortOrder `json:"sort-order"` |
|||
Properties map[string]string `json:"properties"` |
|||
} |
|||
|
|||
type icebergSchema struct { |
|||
Type string `json:"type"` |
|||
SchemaID int `json:"schema-id"` |
|||
Fields []icebergSchemaField `json:"fields"` |
|||
} |
|||
|
|||
type icebergSchemaField struct { |
|||
ID int `json:"id"` |
|||
Name string `json:"name"` |
|||
Type string `json:"type"` |
|||
Required bool `json:"required"` |
|||
} |
|||
|
|||
type icebergPartition struct { |
|||
SpecID int `json:"spec-id"` |
|||
Fields []icebergPartitionField `json:"fields"` |
|||
} |
|||
|
|||
type icebergPartitionField struct { |
|||
SourceID int `json:"source-id"` |
|||
FieldID int `json:"field-id"` |
|||
Name string `json:"name"` |
|||
Transform string `json:"transform"` |
|||
} |
|||
|
|||
type icebergSortOrder struct { |
|||
OrderID int `json:"order-id"` |
|||
Fields []icebergSortField `json:"fields"` |
|||
} |
|||
|
|||
type icebergSortField struct { |
|||
SourceID int `json:"source-id"` |
|||
Direction string `json:"direction"` |
|||
NullOrder string `json:"null-order"` |
|||
} |
|||
|
|||
type loadTableResponse struct { |
|||
Config map[string]string `json:"config"` |
|||
StorageCredentials []storageCredential `json:"storage-credentials"` |
|||
} |
|||
|
|||
type loadCredentialsResponse struct { |
|||
StorageCredentials []storageCredential `json:"storage-credentials"` |
|||
} |
|||
|
|||
type storageCredential struct { |
|||
Prefix string `json:"prefix"` |
|||
Config map[string]string `json:"config"` |
|||
} |
|||
|
|||
func bootstrapPolarisTest(t *testing.T, env *TestEnvironment) (context.Context, context.CancelFunc, polarisSession, *polarisTableSetup, func()) { |
|||
t.Helper() |
|||
|
|||
t.Logf(">>> Starting SeaweedFS with Polaris configuration...") |
|||
env.StartSeaweedFS(t) |
|||
t.Logf(">>> SeaweedFS started.") |
|||
|
|||
t.Logf(">>> Starting Polaris...") |
|||
env.StartPolaris(t) |
|||
t.Logf(">>> Polaris started.") |
|||
|
|||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) |
|||
session := newPolarisSession(t, ctx, env) |
|||
setup, cleanup := setupPolarisTable(t, ctx, env, session) |
|||
|
|||
return ctx, cancel, session, setup, cleanup |
|||
} |
|||
|
|||
func TestPolarisIntegration(t *testing.T) { |
|||
env := NewTestEnvironment(t) |
|||
defer env.Cleanup(t) |
|||
|
|||
ctx, cancel, session, setup, cleanup := bootstrapPolarisTest(t, env) |
|||
defer cancel() |
|||
defer cleanup() |
|||
|
|||
objectKey := fmt.Sprintf("%s/hello-%d.txt", setup.dataKeyPrefix, time.Now().UnixNano()) |
|||
payload := []byte("polaris") |
|||
|
|||
if _, err := setup.s3Client.PutObject(ctx, &s3.PutObjectInput{ |
|||
Bucket: aws.String(session.bucketName), |
|||
Key: aws.String(objectKey), |
|||
Body: bytes.NewReader(payload), |
|||
}); err != nil { |
|||
t.Fatalf("PutObject failed: %v", err) |
|||
} |
|||
|
|||
listObjects, err := setup.s3Client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ |
|||
Bucket: aws.String(session.bucketName), |
|||
Prefix: aws.String(objectKey), |
|||
}) |
|||
if err != nil { |
|||
t.Fatalf("ListObjectsV2 failed: %v", err) |
|||
} |
|||
|
|||
found := false |
|||
for _, obj := range listObjects.Contents { |
|||
if aws.ToString(obj.Key) == objectKey { |
|||
found = true |
|||
break |
|||
} |
|||
} |
|||
if !found { |
|||
t.Fatalf("Object %s not found in list", objectKey) |
|||
} |
|||
|
|||
getResp, err := setup.s3Client.GetObject(ctx, &s3.GetObjectInput{ |
|||
Bucket: aws.String(session.bucketName), |
|||
Key: aws.String(objectKey), |
|||
}) |
|||
if err != nil { |
|||
t.Fatalf("GetObject failed: %v", err) |
|||
} |
|||
body, err := io.ReadAll(getResp.Body) |
|||
_ = getResp.Body.Close() |
|||
if err != nil { |
|||
t.Fatalf("Read object body failed: %v", err) |
|||
} |
|||
if !bytes.Equal(body, payload) { |
|||
t.Fatalf("Unexpected object payload: got %q want %q", string(body), string(payload)) |
|||
} |
|||
|
|||
if _, err := setup.s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{ |
|||
Bucket: aws.String(session.bucketName), |
|||
Key: aws.String(objectKey), |
|||
}); err != nil { |
|||
t.Fatalf("DeleteObject failed: %v", err) |
|||
} |
|||
} |
|||
|
|||
func TestPolarisTableIntegration(t *testing.T) { |
|||
env := NewTestEnvironment(t) |
|||
defer env.Cleanup(t) |
|||
|
|||
ctx, cancel, session, setup, cleanup := bootstrapPolarisTest(t, env) |
|||
defer cancel() |
|||
defer cleanup() |
|||
|
|||
objectKey := fmt.Sprintf("%s/part-%d.parquet", setup.dataKeyPrefix, time.Now().UnixNano()) |
|||
createResp, err := setup.s3Client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ |
|||
Bucket: aws.String(session.bucketName), |
|||
Key: aws.String(objectKey), |
|||
}) |
|||
if err != nil { |
|||
t.Fatalf("CreateMultipartUpload failed: %v", err) |
|||
} |
|||
|
|||
uploadID := aws.ToString(createResp.UploadId) |
|||
multipartCompleted := false |
|||
defer func() { |
|||
if uploadID == "" || multipartCompleted { |
|||
return |
|||
} |
|||
_, _ = setup.s3Client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ |
|||
Bucket: aws.String(session.bucketName), |
|||
Key: aws.String(objectKey), |
|||
UploadId: aws.String(uploadID), |
|||
}) |
|||
}() |
|||
|
|||
partSize := 5 * 1024 * 1024 |
|||
part1 := bytes.Repeat([]byte("a"), partSize) |
|||
part2 := bytes.Repeat([]byte("b"), 1024*1024) |
|||
|
|||
part1Resp, err := setup.s3Client.UploadPart(ctx, &s3.UploadPartInput{ |
|||
Bucket: aws.String(session.bucketName), |
|||
Key: aws.String(objectKey), |
|||
UploadId: aws.String(uploadID), |
|||
PartNumber: aws.Int32(1), |
|||
Body: bytes.NewReader(part1), |
|||
}) |
|||
if err != nil { |
|||
t.Fatalf("UploadPart 1 failed: %v", err) |
|||
} |
|||
|
|||
part2Resp, err := setup.s3Client.UploadPart(ctx, &s3.UploadPartInput{ |
|||
Bucket: aws.String(session.bucketName), |
|||
Key: aws.String(objectKey), |
|||
UploadId: aws.String(uploadID), |
|||
PartNumber: aws.Int32(2), |
|||
Body: bytes.NewReader(part2), |
|||
}) |
|||
if err != nil { |
|||
t.Fatalf("UploadPart 2 failed: %v", err) |
|||
} |
|||
|
|||
_, err = setup.s3Client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ |
|||
Bucket: aws.String(session.bucketName), |
|||
Key: aws.String(objectKey), |
|||
UploadId: aws.String(uploadID), |
|||
MultipartUpload: &s3types.CompletedMultipartUpload{ |
|||
Parts: []s3types.CompletedPart{ |
|||
{ |
|||
ETag: part1Resp.ETag, |
|||
PartNumber: aws.Int32(1), |
|||
}, |
|||
{ |
|||
ETag: part2Resp.ETag, |
|||
PartNumber: aws.Int32(2), |
|||
}, |
|||
}, |
|||
}, |
|||
}) |
|||
if err != nil { |
|||
t.Fatalf("CompleteMultipartUpload failed: %v", err) |
|||
} |
|||
multipartCompleted = true |
|||
|
|||
headResp, err := setup.s3Client.HeadObject(ctx, &s3.HeadObjectInput{ |
|||
Bucket: aws.String(session.bucketName), |
|||
Key: aws.String(objectKey), |
|||
}) |
|||
if err != nil { |
|||
t.Fatalf("HeadObject after multipart upload failed: %v", err) |
|||
} |
|||
expectedSize := int64(len(part1) + len(part2)) |
|||
if headResp.ContentLength == nil || *headResp.ContentLength != expectedSize { |
|||
t.Fatalf("Unexpected content length: got %d want %d", aws.ToInt64(headResp.ContentLength), expectedSize) |
|||
} |
|||
} |
|||
|
|||
func newPolarisSession(t *testing.T, ctx context.Context, env *TestEnvironment) polarisSession { |
|||
t.Helper() |
|||
|
|||
adminCreds := aws.Credentials{ |
|||
AccessKeyID: env.accessKey, |
|||
SecretAccessKey: env.secretKey, |
|||
Source: "polaris-admin", |
|||
} |
|||
adminS3, err := newS3Client(ctx, env.s3Endpoint(), polarisRegion, adminCreds, true) |
|||
if err != nil { |
|||
t.Fatalf("Create admin S3 client failed: %v", err) |
|||
} |
|||
|
|||
bucketName := fmt.Sprintf("polaris-bucket-%d", time.Now().UnixNano()) |
|||
if _, err := adminS3.CreateBucket(ctx, &s3.CreateBucketInput{ |
|||
Bucket: aws.String(bucketName), |
|||
}); err != nil { |
|||
t.Fatalf("CreateBucket failed: %v", err) |
|||
} |
|||
policy := fmt.Sprintf(`{"Version":"2012-10-17","Statement":[{"Sid":"AllowPolarisVendedAccess","Effect":"Allow","Principal":"*","Action":"s3:*","Resource":["arn:aws:s3:::%s","arn:aws:s3:::%s/polaris/*"]}]}`, bucketName, bucketName) |
|||
if _, err := adminS3.PutBucketPolicy(ctx, &s3.PutBucketPolicyInput{ |
|||
Bucket: aws.String(bucketName), |
|||
Policy: aws.String(policy), |
|||
}); err != nil { |
|||
t.Fatalf("PutBucketPolicy failed: %v", err) |
|||
} |
|||
|
|||
rootToken, err := fetchPolarisToken(ctx, env.polarisEndpoint(), polarisRootClientID, polarisRootClientSecret) |
|||
if err != nil { |
|||
t.Fatalf("Polaris root token request failed: %v", err) |
|||
} |
|||
|
|||
managementClient := newPolarisHTTPClient(env.polarisEndpoint(), polarisRealm, rootToken) |
|||
catalogName := fmt.Sprintf("polaris_catalog_%d", time.Now().UnixNano()) |
|||
baseLocation := fmt.Sprintf("s3://%s/polaris", bucketName) |
|||
|
|||
catalogRequest := createCatalogRequest{ |
|||
Catalog: polarisCatalog{ |
|||
Name: catalogName, |
|||
Type: "INTERNAL", |
|||
ReadOnly: false, |
|||
Properties: map[string]string{ |
|||
"default-base-location": baseLocation, |
|||
}, |
|||
StorageConfigInfo: polarisStorageConfig{ |
|||
StorageType: "S3", |
|||
AllowedLocations: []string{baseLocation}, |
|||
Endpoint: env.s3Endpoint(), |
|||
EndpointInternal: env.s3InternalEndpoint(), |
|||
StsEndpoint: env.s3InternalEndpoint(), |
|||
PathStyleAccess: true, |
|||
RoleArn: polarisRoleArn, |
|||
Region: polarisRegion, |
|||
}, |
|||
}, |
|||
} |
|||
|
|||
if err := managementClient.doJSON(ctx, http.MethodPost, "/api/management/v1/catalogs", catalogRequest, nil); err != nil { |
|||
t.Fatalf("Create catalog failed: %v", err) |
|||
} |
|||
|
|||
principalName := fmt.Sprintf("polaris_user_%d", time.Now().UnixNano()) |
|||
principalRoleName := fmt.Sprintf("polaris_principal_role_%d", time.Now().UnixNano()) |
|||
catalogRoleName := fmt.Sprintf("polaris_catalog_role_%d", time.Now().UnixNano()) |
|||
|
|||
var principalResp createPrincipalResponse |
|||
if err := managementClient.doJSON(ctx, http.MethodPost, "/api/management/v1/principals", map[string]interface{}{ |
|||
"principal": map[string]interface{}{ |
|||
"name": principalName, |
|||
"properties": map[string]string{}, |
|||
}, |
|||
}, &principalResp); err != nil { |
|||
t.Fatalf("Create principal failed: %v", err) |
|||
} |
|||
if principalResp.Credentials.ClientID == "" || principalResp.Credentials.ClientSecret == "" { |
|||
t.Fatalf("Missing principal credentials in response") |
|||
} |
|||
|
|||
if err := managementClient.doJSON(ctx, http.MethodPost, "/api/management/v1/principal-roles", map[string]interface{}{ |
|||
"principalRole": map[string]interface{}{ |
|||
"name": principalRoleName, |
|||
"properties": map[string]string{}, |
|||
}, |
|||
}, nil); err != nil { |
|||
t.Fatalf("Create principal role failed: %v", err) |
|||
} |
|||
|
|||
if err := managementClient.doJSON(ctx, http.MethodPost, fmt.Sprintf("/api/management/v1/catalogs/%s/catalog-roles", url.PathEscape(catalogName)), map[string]interface{}{ |
|||
"catalogRole": map[string]interface{}{ |
|||
"name": catalogRoleName, |
|||
"properties": map[string]string{}, |
|||
}, |
|||
}, nil); err != nil { |
|||
t.Fatalf("Create catalog role failed: %v", err) |
|||
} |
|||
|
|||
if err := managementClient.doJSON(ctx, http.MethodPut, fmt.Sprintf("/api/management/v1/principals/%s/principal-roles", url.PathEscape(principalName)), map[string]interface{}{ |
|||
"principalRole": map[string]interface{}{ |
|||
"name": principalRoleName, |
|||
}, |
|||
}, nil); err != nil { |
|||
t.Fatalf("Assign principal role failed: %v", err) |
|||
} |
|||
|
|||
if err := managementClient.doJSON(ctx, http.MethodPut, fmt.Sprintf("/api/management/v1/principal-roles/%s/catalog-roles/%s", url.PathEscape(principalRoleName), url.PathEscape(catalogName)), map[string]interface{}{ |
|||
"catalogRole": map[string]interface{}{ |
|||
"name": catalogRoleName, |
|||
}, |
|||
}, nil); err != nil { |
|||
t.Fatalf("Assign catalog role failed: %v", err) |
|||
} |
|||
|
|||
if err := managementClient.doJSON(ctx, http.MethodPut, fmt.Sprintf("/api/management/v1/catalogs/%s/catalog-roles/%s/grants", url.PathEscape(catalogName), url.PathEscape(catalogRoleName)), map[string]interface{}{ |
|||
"type": "catalog", |
|||
"privilege": "CATALOG_MANAGE_CONTENT", |
|||
}, nil); err != nil { |
|||
t.Fatalf("Grant catalog privilege failed: %v", err) |
|||
} |
|||
|
|||
userToken, err := fetchPolarisToken(ctx, env.polarisEndpoint(), principalResp.Credentials.ClientID, principalResp.Credentials.ClientSecret) |
|||
if err != nil { |
|||
t.Fatalf("Polaris user token request failed: %v", err) |
|||
} |
|||
|
|||
return polarisSession{ |
|||
catalogName: catalogName, |
|||
bucketName: bucketName, |
|||
token: userToken, |
|||
baseLocation: baseLocation, |
|||
} |
|||
} |
|||
|
|||
func setupPolarisTable(t *testing.T, ctx context.Context, env *TestEnvironment, session polarisSession) (*polarisTableSetup, func()) { |
|||
t.Helper() |
|||
|
|||
catalogClient := newPolarisCatalogClient(env.polarisEndpoint(), polarisRealm, session.token, session.catalogName) |
|||
namespace := fmt.Sprintf("polaris_ns_%d", time.Now().UnixNano()) |
|||
table := fmt.Sprintf("polaris_table_%d", time.Now().UnixNano()) |
|||
|
|||
if err := catalogClient.CreateNamespace(ctx, namespace); err != nil { |
|||
t.Fatalf("CreateNamespace failed: %v", err) |
|||
} |
|||
|
|||
location := fmt.Sprintf("%s/%s/%s", session.baseLocation, namespace, table) |
|||
if err := catalogClient.CreateTable(ctx, namespace, table, location); err != nil { |
|||
t.Fatalf("CreateTable failed: %v", err) |
|||
} |
|||
|
|||
loadResp, err := catalogClient.LoadTable(ctx, namespace, table) |
|||
if err != nil { |
|||
t.Fatalf("LoadTable failed: %v", err) |
|||
} |
|||
|
|||
credsResp, err := catalogClient.LoadCredentials(ctx, namespace, table) |
|||
if err != nil { |
|||
t.Fatalf("LoadCredentials failed: %v", err) |
|||
} |
|||
if len(credsResp.StorageCredentials) == 0 { |
|||
t.Fatalf("LoadCredentials returned no storage credentials") |
|||
} |
|||
|
|||
credentialSource := &loadTableResponse{ |
|||
Config: loadResp.Config, |
|||
StorageCredentials: credsResp.StorageCredentials, |
|||
} |
|||
|
|||
dataPrefix := location + "/data" |
|||
creds, endpoint, region, pathStyle, err := extractS3Credentials(credentialSource, dataPrefix, env.s3Endpoint(), polarisRegion) |
|||
if err != nil { |
|||
t.Fatalf("Extract vended credentials failed: %v", err) |
|||
} |
|||
dataKeyPrefix, err := s3URIToKeyPrefix(dataPrefix, session.bucketName) |
|||
if err != nil { |
|||
t.Fatalf("Invalid data prefix %s: %v", dataPrefix, err) |
|||
} |
|||
|
|||
s3Client, err := newS3Client(ctx, endpoint, region, creds, pathStyle) |
|||
if err != nil { |
|||
t.Fatalf("Create vended S3 client failed: %v", err) |
|||
} |
|||
|
|||
cleanup := func() { |
|||
cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 30*time.Second) |
|||
defer cleanupCancel() |
|||
if err := catalogClient.DeleteTable(cleanupCtx, namespace, table); err != nil { |
|||
t.Logf("DeleteTable failed: %v", err) |
|||
} |
|||
if err := catalogClient.DeleteNamespace(cleanupCtx, namespace); err != nil { |
|||
t.Logf("DeleteNamespace failed: %v", err) |
|||
} |
|||
} |
|||
|
|||
return &polarisTableSetup{ |
|||
namespace: namespace, |
|||
table: table, |
|||
dataKeyPrefix: dataKeyPrefix, |
|||
s3Client: s3Client, |
|||
}, cleanup |
|||
} |
|||
|
|||
func fetchPolarisToken(ctx context.Context, baseURL, clientID, clientSecret string) (string, error) { |
|||
form := url.Values{} |
|||
form.Set("grant_type", "client_credentials") |
|||
form.Set("client_id", clientID) |
|||
form.Set("client_secret", clientSecret) |
|||
form.Set("scope", "PRINCIPAL_ROLE:ALL") |
|||
|
|||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, baseURL+"/api/catalog/v1/oauth/tokens", strings.NewReader(form.Encode())) |
|||
if err != nil { |
|||
return "", fmt.Errorf("create token request: %w", err) |
|||
} |
|||
req.Header.Set("Content-Type", "application/x-www-form-urlencoded") |
|||
req.Header.Set("Accept", "application/json") |
|||
|
|||
client := &http.Client{Timeout: 30 * time.Second} |
|||
resp, err := client.Do(req) |
|||
if err != nil { |
|||
return "", fmt.Errorf("token request failed: %w", err) |
|||
} |
|||
defer resp.Body.Close() |
|||
|
|||
if resp.StatusCode != http.StatusOK { |
|||
body, readErr := io.ReadAll(resp.Body) |
|||
if readErr != nil { |
|||
return "", fmt.Errorf("token request failed with status %d and reading body: %w", resp.StatusCode, readErr) |
|||
} |
|||
return "", fmt.Errorf("token request failed with status %d: %s", resp.StatusCode, strings.TrimSpace(string(body))) |
|||
} |
|||
|
|||
var tokenResp struct { |
|||
AccessToken string `json:"access_token"` |
|||
} |
|||
if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil { |
|||
return "", fmt.Errorf("decode token response: %w", err) |
|||
} |
|||
if tokenResp.AccessToken == "" { |
|||
return "", fmt.Errorf("missing access token in response") |
|||
} |
|||
|
|||
return tokenResp.AccessToken, nil |
|||
} |
|||
|
|||
func newPolarisCatalogClient(baseURL, realm, token, catalog string) *polarisCatalogClient { |
|||
return &polarisCatalogClient{ |
|||
http: newPolarisHTTPClient(baseURL, realm, token), |
|||
catalog: catalog, |
|||
} |
|||
} |
|||
|
|||
func (c *polarisCatalogClient) CreateNamespace(ctx context.Context, namespace string) error { |
|||
path := fmt.Sprintf("/api/catalog/v1/%s/namespaces", url.PathEscape(c.catalog)) |
|||
req := createNamespaceRequest{Namespace: []string{namespace}} |
|||
return c.http.doJSON(ctx, http.MethodPost, path, req, nil) |
|||
} |
|||
|
|||
func (c *polarisCatalogClient) DeleteNamespace(ctx context.Context, namespace string) error { |
|||
path := fmt.Sprintf("/api/catalog/v1/%s/namespaces/%s", url.PathEscape(c.catalog), url.PathEscape(namespace)) |
|||
return c.http.doJSON(ctx, http.MethodDelete, path, nil, nil) |
|||
} |
|||
|
|||
func (c *polarisCatalogClient) CreateTable(ctx context.Context, namespace, table, location string) error { |
|||
path := fmt.Sprintf("/api/catalog/v1/%s/namespaces/%s/tables", url.PathEscape(c.catalog), url.PathEscape(namespace)) |
|||
|
|||
req := createTableRequest{ |
|||
Name: table, |
|||
Location: location, |
|||
Schema: icebergSchema{ |
|||
Type: "struct", |
|||
SchemaID: 0, |
|||
Fields: []icebergSchemaField{ |
|||
{ |
|||
ID: 1, |
|||
Name: "id", |
|||
Type: "long", |
|||
Required: false, |
|||
}, |
|||
}, |
|||
}, |
|||
PartitionSpec: icebergPartition{ |
|||
SpecID: 0, |
|||
Fields: []icebergPartitionField{}, |
|||
}, |
|||
SortOrder: icebergSortOrder{ |
|||
OrderID: 0, |
|||
Fields: []icebergSortField{}, |
|||
}, |
|||
Properties: map[string]string{ |
|||
"format-version": "2", |
|||
}, |
|||
} |
|||
|
|||
return c.http.doJSON(ctx, http.MethodPost, path, req, nil) |
|||
} |
|||
|
|||
func (c *polarisCatalogClient) DeleteTable(ctx context.Context, namespace, table string) error { |
|||
path := fmt.Sprintf("/api/catalog/v1/%s/namespaces/%s/tables/%s", url.PathEscape(c.catalog), url.PathEscape(namespace), url.PathEscape(table)) |
|||
return c.http.doJSON(ctx, http.MethodDelete, path, nil, nil) |
|||
} |
|||
|
|||
func (c *polarisCatalogClient) LoadTable(ctx context.Context, namespace, table string) (*loadTableResponse, error) { |
|||
path := fmt.Sprintf("/api/catalog/v1/%s/namespaces/%s/tables/%s", url.PathEscape(c.catalog), url.PathEscape(namespace), url.PathEscape(table)) |
|||
var resp loadTableResponse |
|||
headers := map[string]string{ |
|||
"X-Iceberg-Access-Delegation": "vended-credentials", |
|||
} |
|||
if err := c.http.doJSONWithHeaders(ctx, http.MethodGet, path, nil, &resp, headers); err != nil { |
|||
return nil, err |
|||
} |
|||
return &resp, nil |
|||
} |
|||
|
|||
func (c *polarisCatalogClient) LoadCredentials(ctx context.Context, namespace, table string) (*loadCredentialsResponse, error) { |
|||
path := fmt.Sprintf("/api/catalog/v1/%s/namespaces/%s/tables/%s/credentials", url.PathEscape(c.catalog), url.PathEscape(namespace), url.PathEscape(table)) |
|||
var resp loadCredentialsResponse |
|||
if err := c.http.doJSON(ctx, http.MethodGet, path, nil, &resp); err != nil { |
|||
return nil, err |
|||
} |
|||
return &resp, nil |
|||
} |
|||
|
|||
func extractS3Credentials(load *loadTableResponse, targetPrefix, fallbackEndpoint, fallbackRegion string) (aws.Credentials, string, string, bool, error) { |
|||
configMap, err := selectStorageConfig(load, targetPrefix) |
|||
if err != nil { |
|||
return aws.Credentials{}, "", "", false, err |
|||
} |
|||
|
|||
lookup := func(keys ...string) string { |
|||
for _, key := range keys { |
|||
if val, ok := configMap[key]; ok && val != "" { |
|||
return val |
|||
} |
|||
} |
|||
return "" |
|||
} |
|||
|
|||
accessKey := lookup("s3.access-key-id", "aws.access-key-id", "s3.accessKeyId", "aws.accessKeyId", "accessKeyId", "access_key_id") |
|||
secretKey := lookup("s3.secret-access-key", "aws.secret-access-key", "s3.secretAccessKey", "aws.secretAccessKey", "secretAccessKey", "secret_access_key") |
|||
sessionToken := lookup("s3.session-token", "aws.session-token", "s3.sessionToken", "aws.sessionToken", "sessionToken", "session_token") |
|||
if accessKey == "" || secretKey == "" { |
|||
if altConfig := findConfigWithKeys(load, targetPrefix); altConfig != nil { |
|||
configMap = altConfig |
|||
accessKey = lookup("s3.access-key-id", "aws.access-key-id", "s3.accessKeyId", "aws.accessKeyId", "accessKeyId", "access_key_id") |
|||
secretKey = lookup("s3.secret-access-key", "aws.secret-access-key", "s3.secretAccessKey", "aws.secretAccessKey", "secretAccessKey", "secret_access_key") |
|||
sessionToken = lookup("s3.session-token", "aws.session-token", "s3.sessionToken", "aws.sessionToken", "sessionToken", "session_token") |
|||
} |
|||
} |
|||
if accessKey == "" || secretKey == "" { |
|||
return aws.Credentials{}, "", "", false, fmt.Errorf("missing access key or secret in storage credentials") |
|||
} |
|||
|
|||
endpoint := lookup("s3.endpoint", "s3.endpoint-url", "aws.endpoint") |
|||
if endpoint == "" { |
|||
endpoint = fallbackEndpoint |
|||
} |
|||
if endpoint != "" && !strings.HasPrefix(endpoint, "http") { |
|||
endpoint = "http://" + endpoint |
|||
} |
|||
|
|||
region := lookup("s3.region", "aws.region") |
|||
if region == "" { |
|||
region = fallbackRegion |
|||
} |
|||
|
|||
pathStyle := true |
|||
if value := lookup("s3.path-style-access", "s3.pathStyleAccess", "path-style-access"); value != "" { |
|||
pathStyle = strings.EqualFold(value, "true") |
|||
} |
|||
|
|||
return aws.Credentials{ |
|||
AccessKeyID: accessKey, |
|||
SecretAccessKey: secretKey, |
|||
SessionToken: sessionToken, |
|||
Source: "polaris-vended", |
|||
}, endpoint, region, pathStyle, nil |
|||
} |
|||
|
|||
func selectStorageConfig(load *loadTableResponse, targetPrefix string) (map[string]string, error) { |
|||
switch len(load.StorageCredentials) { |
|||
case 0: |
|||
if load.Config == nil { |
|||
return nil, fmt.Errorf("polaris returned no storage credentials or config") |
|||
} |
|||
return load.Config, nil |
|||
case 1: |
|||
cred := load.StorageCredentials[0] |
|||
if cred.Config == nil { |
|||
return nil, fmt.Errorf("storage credential for prefix %s returned nil config", cred.Prefix) |
|||
} |
|||
return cred.Config, nil |
|||
default: |
|||
if targetPrefix == "" { |
|||
return nil, fmt.Errorf("multiple storage credentials (%d) returned but no target prefix provided", len(load.StorageCredentials)) |
|||
} |
|||
normalizedTarget := normalizePrefix(targetPrefix) |
|||
if normalizedTarget == "" { |
|||
return nil, fmt.Errorf("target prefix %q normalized to empty string", targetPrefix) |
|||
} |
|||
var bestConfig map[string]string |
|||
bestLen := -1 |
|||
for _, cred := range load.StorageCredentials { |
|||
if cred.Config == nil { |
|||
continue |
|||
} |
|||
prefix := normalizePrefix(cred.Prefix) |
|||
if prefix == "" { |
|||
if bestLen < 0 { |
|||
bestLen = 0 |
|||
bestConfig = cred.Config |
|||
} |
|||
continue |
|||
} |
|||
if normalizedTarget == prefix || strings.HasPrefix(normalizedTarget, prefix+"/") { |
|||
if len(prefix) > bestLen { |
|||
bestLen = len(prefix) |
|||
bestConfig = cred.Config |
|||
} |
|||
} |
|||
} |
|||
if bestConfig != nil { |
|||
return bestConfig, nil |
|||
} |
|||
return nil, fmt.Errorf("none of the %d storage credentials matched prefix %s", len(load.StorageCredentials), targetPrefix) |
|||
} |
|||
} |
|||
|
|||
func normalizePrefix(prefix string) string { |
|||
p := strings.TrimSpace(prefix) |
|||
p = strings.TrimSuffix(p, "/") |
|||
return p |
|||
} |
|||
|
|||
func findConfigWithKeys(load *loadTableResponse, targetPrefix string) map[string]string { |
|||
normalizedTarget := normalizePrefix(targetPrefix) |
|||
if normalizedTarget == "" { |
|||
return nil |
|||
} |
|||
|
|||
hasKeys := func(config map[string]string) bool { |
|||
if config == nil { |
|||
return false |
|||
} |
|||
accessKey := lookupValue(config, "s3.access-key-id", "aws.access-key-id", "s3.accessKeyId", "aws.accessKeyId", "accessKeyId", "access_key_id") |
|||
secretKey := lookupValue(config, "s3.secret-access-key", "aws.secret-access-key", "s3.secretAccessKey", "aws.secretAccessKey", "secretAccessKey", "secret_access_key") |
|||
return accessKey != "" && secretKey != "" |
|||
} |
|||
|
|||
bestConfig := map[string]string(nil) |
|||
bestLen := -1 |
|||
for _, cred := range load.StorageCredentials { |
|||
if !hasKeys(cred.Config) { |
|||
continue |
|||
} |
|||
prefix := normalizePrefix(cred.Prefix) |
|||
if prefix == "" { |
|||
if bestLen < 0 { |
|||
bestLen = 0 |
|||
bestConfig = cred.Config |
|||
} |
|||
continue |
|||
} |
|||
if normalizedTarget == prefix || strings.HasPrefix(normalizedTarget, prefix+"/") { |
|||
if len(prefix) > bestLen { |
|||
bestLen = len(prefix) |
|||
bestConfig = cred.Config |
|||
} |
|||
} |
|||
} |
|||
if bestConfig != nil { |
|||
return bestConfig |
|||
} |
|||
|
|||
if len(load.StorageCredentials) == 0 && hasKeys(load.Config) { |
|||
return load.Config |
|||
} |
|||
if hasKeys(load.Config) { |
|||
return load.Config |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func lookupValue(config map[string]string, keys ...string) string { |
|||
for _, key := range keys { |
|||
if val, ok := config[key]; ok && val != "" { |
|||
return val |
|||
} |
|||
} |
|||
return "" |
|||
} |
|||
|
|||
func s3URIToKeyPrefix(uri, bucket string) (string, error) { |
|||
prefix := "s3://" + bucket + "/" |
|||
if !strings.HasPrefix(uri, prefix) { |
|||
return "", fmt.Errorf("uri %q does not match bucket %q", uri, bucket) |
|||
} |
|||
keyPrefix := strings.TrimPrefix(uri, prefix) |
|||
keyPrefix = strings.TrimPrefix(keyPrefix, "/") |
|||
if keyPrefix == "" { |
|||
return "", fmt.Errorf("empty key prefix in uri %q", uri) |
|||
} |
|||
return keyPrefix, nil |
|||
} |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue