diff --git a/.github/workflows/s3-spark-tests.yml b/.github/workflows/s3-spark-tests.yml new file mode 100644 index 000000000..2fe49b123 --- /dev/null +++ b/.github/workflows/s3-spark-tests.yml @@ -0,0 +1,80 @@ +name: "S3 Spark Integration Tests" + +on: + pull_request: + paths: + - 'weed/s3api/**' + - 'test/s3/spark/**' + - 'test/s3tables/testutil/**' + - '.github/workflows/s3-spark-tests.yml' + workflow_dispatch: + +concurrency: + group: ${{ github.head_ref }}/s3-spark-tests + cancel-in-progress: true + +permissions: + contents: read + +jobs: + s3-spark-issue-repro-tests: + name: S3 Spark Issue Reproduction Tests + runs-on: ubuntu-22.04 + timeout-minutes: 45 + + steps: + - name: Check out code + uses: actions/checkout@v6 + + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' + id: go + + - name: Set up Docker + uses: docker/setup-buildx-action@v3 + + - name: Install SeaweedFS + run: | + go install -buildvcs=false ./weed + + - name: Pre-pull Spark image + run: docker pull apache/spark:3.5.8 + + - name: Run S3 Spark integration tests + working-directory: test/s3/spark + timeout-minutes: 35 + run: | + set -x + set -o pipefail + echo "=== System Information ===" + uname -a + free -h + df -h + echo "=== Starting S3 Spark Integration Tests ===" + + go test -v -timeout 30m . 2>&1 | tee test-output.log || { + echo "S3 Spark integration tests failed" + exit 1 + } + + - name: Show test output on failure + if: failure() + working-directory: test/s3/spark + run: | + echo "=== Test Output ===" + if [ -f test-output.log ]; then + tail -200 test-output.log + fi + + echo "=== Process information ===" + ps aux | grep -E "(weed|test|docker|spark)" || true + + - name: Upload test logs on failure + if: failure() + uses: actions/upload-artifact@v6 + with: + name: s3-spark-test-logs + path: test/s3/spark/test-output.log + retention-days: 3 diff --git a/test/s3/spark/issue_8234_repro_test.go b/test/s3/spark/issue_8234_repro_test.go new file mode 100644 index 000000000..7f78ae9f5 --- /dev/null +++ b/test/s3/spark/issue_8234_repro_test.go @@ -0,0 +1,76 @@ +package spark + +import ( + "strings" + "testing" +) + +func setupSparkIssue8234Env(t *testing.T) *TestEnvironment { + t.Helper() + + env := NewTestEnvironment() + if !env.dockerAvailable { + t.Skip("Docker not available, skipping Spark S3 integration test") + } + + env.StartSeaweedFS(t) + t.Cleanup(func() { env.Cleanup() }) + + createObjectBucket(t, env, "test") + env.startSparkContainer(t) + return env +} + +func TestSparkS3AppendIssue8234Regression(t *testing.T) { + if testing.Short() { + t.Skip("Skipping Spark integration test in short mode") + } + + env := setupSparkIssue8234Env(t) + + script := ` +import sys +import pyspark.sql.types as T + +target = "s3a://test/test_table/" +sc = spark.sparkContext +df = sc.parallelize([1, 2, 3]).toDF(T.IntegerType()) + +df.write.format("parquet").mode("append").save(target) +print("FIRST_APPEND_OK") + +try: + df.write.format("parquet").mode("append").save(target) + print("SECOND_APPEND_OK") + spark.stop() + sys.exit(0) +except Exception as err: + message = str(err) + print("SECOND_APPEND_FAILED") + print(message) + if "Copy Source must mention the source bucket and key" in message: + print("ISSUE_8234_REGRESSION") + spark.stop() + sys.exit(4) + spark.stop() + sys.exit(2) +` + + code, output := runSparkPyScript(t, env.sparkContainer, script, env.s3Port) + if code != 0 { + if strings.Contains(output, "ISSUE_8234_REGRESSION") { + t.Fatalf("issue #8234 regression detected; output:\n%s", output) + } + t.Fatalf("Spark script exited with code %d; output:\n%s", code, output) + } + + if !strings.Contains(output, "FIRST_APPEND_OK") { + t.Fatalf("expected first append success marker in output, got:\n%s", output) + } + if !strings.Contains(output, "SECOND_APPEND_OK") { + t.Fatalf("expected second append success marker in output, got:\n%s", output) + } + if strings.Contains(output, "Copy Source must mention the source bucket and key") { + t.Fatalf("unexpected issue #8234 error in output:\n%s", output) + } +} diff --git a/test/s3/spark/setup_test.go b/test/s3/spark/setup_test.go new file mode 100644 index 000000000..eadd84d18 --- /dev/null +++ b/test/s3/spark/setup_test.go @@ -0,0 +1,319 @@ +package spark + +import ( + "context" + "fmt" + "io" + "math/rand" + "net" + "os" + "os/exec" + "sync" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/seaweedfs/seaweedfs/test/s3tables/testutil" + "github.com/testcontainers/testcontainers-go" +) + +var ( + miniProcessMu sync.Mutex + lastMiniProcess *exec.Cmd +) + +func stopPreviousMini() { + miniProcessMu.Lock() + defer miniProcessMu.Unlock() + + if lastMiniProcess != nil && lastMiniProcess.Process != nil { + _ = lastMiniProcess.Process.Kill() + _ = lastMiniProcess.Wait() + } + lastMiniProcess = nil +} + +func registerMiniProcess(cmd *exec.Cmd) { + miniProcessMu.Lock() + lastMiniProcess = cmd + miniProcessMu.Unlock() +} + +func clearMiniProcess(cmd *exec.Cmd) { + miniProcessMu.Lock() + if lastMiniProcess == cmd { + lastMiniProcess = nil + } + miniProcessMu.Unlock() +} + +type TestEnvironment struct { + dockerAvailable bool + weedBinary string + seaweedfsDataDir string + masterPort int + filerPort int + s3Port int + accessKey string + secretKey string + sparkContainer testcontainers.Container + masterProcess *exec.Cmd +} + +func NewTestEnvironment() *TestEnvironment { + env := &TestEnvironment{ + accessKey: "test", + secretKey: "test", + } + + cmd := exec.Command("docker", "version") + env.dockerAvailable = cmd.Run() == nil + + if weedPath, err := exec.LookPath("weed"); err == nil { + env.weedBinary = weedPath + } + + return env +} + +func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { + t.Helper() + + if env.weedBinary == "" { + t.Skip("weed binary not found in PATH, skipping Spark S3 integration test") + } + + stopPreviousMini() + + var err error + env.seaweedfsDataDir, err = os.MkdirTemp("", "seaweed-s3-spark-test-") + if err != nil { + t.Fatalf("failed to create temp directory: %v", err) + } + + env.masterPort = mustFreePort(t, "Master") + env.filerPort = mustFreePort(t, "Filer") + env.s3Port = mustFreePort(t, "S3") + + bindIP := testutil.FindBindIP() + iamConfigPath, err := testutil.WriteIAMConfig(env.seaweedfsDataDir, env.accessKey, env.secretKey) + if err != nil { + t.Fatalf("failed to create IAM config: %v", err) + } + + env.masterProcess = exec.Command( + env.weedBinary, "mini", + "-ip", bindIP, + "-ip.bind", "0.0.0.0", + "-master.port", fmt.Sprintf("%d", env.masterPort), + "-filer.port", fmt.Sprintf("%d", env.filerPort), + "-s3.port", fmt.Sprintf("%d", env.s3Port), + "-s3.config", iamConfigPath, + "-dir", env.seaweedfsDataDir, + ) + env.masterProcess.Env = append(os.Environ(), + "AWS_ACCESS_KEY_ID="+env.accessKey, + "AWS_SECRET_ACCESS_KEY="+env.secretKey, + ) + + if err := env.masterProcess.Start(); err != nil { + t.Fatalf("failed to start weed mini: %v", err) + } + registerMiniProcess(env.masterProcess) + + if !waitForPort(env.masterPort, 15*time.Second) { + t.Fatalf("weed mini failed to start - master port %d not listening", env.masterPort) + } + if !waitForPort(env.filerPort, 15*time.Second) { + t.Fatalf("weed mini failed to start - filer port %d not listening", env.filerPort) + } + if !waitForPort(env.s3Port, 15*time.Second) { + t.Fatalf("weed mini failed to start - s3 port %d not listening", env.s3Port) + } +} + +func (env *TestEnvironment) startSparkContainer(t *testing.T) { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + req := testcontainers.ContainerRequest{ + Image: "apache/spark:3.5.1", + ExposedPorts: []string{"4040/tcp"}, + Env: map[string]string{ + "SPARK_LOCAL_IP": "localhost", + }, + ExtraHosts: []string{"host.docker.internal:host-gateway"}, + Cmd: []string{"/bin/sh", "-c", "sleep 7200"}, + } + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + if err != nil { + t.Fatalf("failed to start spark container: %v", err) + } + env.sparkContainer = container +} + +func (env *TestEnvironment) Cleanup() { + if env.masterProcess != nil && env.masterProcess.Process != nil { + _ = env.masterProcess.Process.Kill() + _ = env.masterProcess.Wait() + } + clearMiniProcess(env.masterProcess) + + if env.sparkContainer != nil { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + _ = env.sparkContainer.Terminate(ctx) + } + + if env.seaweedfsDataDir != "" { + _ = os.RemoveAll(env.seaweedfsDataDir) + } +} + +func mustFreePort(t *testing.T, name string) int { + t.Helper() + + for i := 0; i < 200; i++ { + port := 20000 + rand.Intn(30000) + listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port)) + if err != nil { + continue + } + _ = listener.Close() + + grpcPort := port + 10000 + if grpcPort > 65535 { + continue + } + grpcListener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", grpcPort)) + if err != nil { + continue + } + _ = grpcListener.Close() + return port + } + + t.Fatalf("failed to get free port for %s", name) + return 0 +} + +func waitForPort(port int, timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", port), 500*time.Millisecond) + if err == nil { + _ = conn.Close() + return true + } + time.Sleep(100 * time.Millisecond) + } + return false +} + +func runSparkPyScript(t *testing.T, container testcontainers.Container, script string, s3Port int) (int, string) { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), 12*time.Minute) + defer cancel() + + pythonScript := fmt.Sprintf(` +import glob +import os +import sys + +spark_home = os.environ.get("SPARK_HOME", "/opt/spark") +python_path = os.path.join(spark_home, "python") +py4j_glob = glob.glob(os.path.join(python_path, "lib", "py4j-*.zip")) +ivy_dir = "/tmp/ivy" +os.makedirs(ivy_dir, exist_ok=True) +os.environ["AWS_REGION"] = "us-east-1" +os.environ["AWS_DEFAULT_REGION"] = "us-east-1" +os.environ["AWS_ACCESS_KEY_ID"] = "test" +os.environ["AWS_SECRET_ACCESS_KEY"] = "test" + +if python_path not in sys.path: + sys.path.insert(0, python_path) +if py4j_glob and py4j_glob[0] not in sys.path: + sys.path.insert(0, py4j_glob[0]) + +from pyspark.sql import SparkSession + +spark = (SparkSession.builder + .master("local[2]") + .appName("SeaweedFS S3 Spark Issue 8234 Repro") + .config("spark.sql.catalogImplementation", "hive") + .config("spark.executor.extraJavaOptions", "-Djdk.tls.client.protocols=TLSv1") + .config("spark.sql.parquet.int96RebaseModeInRead", "CORRECTED") + .config("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED") + .config("spark.sql.parquet.datetimeRebaseModeInRead", "CORRECTED") + .config("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED") + .config("spark.sql.parquet.enableVectorizedReader", "false") + .config("spark.sql.parquet.mergeSchema", "false") + .config("spark.sql.parquet.writeLegacyFormat", "true") + .config("spark.sql.broadcastTimeout", "-1") + .config("spark.network.timeout", "600") + .config("spark.hadoop.hive.metastore.schema.verification", "false") + .config("spark.hadoop.hive.metastore.schema.verification.record.version", "false") + .config("spark.jars.ivy", ivy_dir) + .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.2,com.amazonaws:aws-java-sdk-bundle:1.12.262") + .config("spark.hadoop.fs.s3a.access.key", "test") + .config("spark.hadoop.fs.s3a.secret.key", "test") + .config("spark.hadoop.fs.s3a.endpoint", "host.docker.internal:%d") + .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "1") + .config("spark.hadoop.fs.s3a.path.style.access", "true") + .config("spark.hadoop.fs.s3a.fast.upload", "true") + .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") + .config("spark.hadoop.fs.s3a.multiobjectdelete.enable", "true") + .config("spark.hadoop.fs.s3a.directory.marker.retention", "keep") + .config("spark.hadoop.fs.s3a.change.detection.version.required", "false") + .config("spark.hadoop.fs.s3a.change.detection.mode", "warn") + .config("spark.local.dir", "/tmp/spark-temp") + .getOrCreate()) + +%s +`, s3Port, script) + + code, out, err := container.Exec(ctx, []string{"python3", "-c", pythonScript}) + var output string + if out != nil { + outputBytes, readErr := io.ReadAll(out) + if readErr != nil { + output = fmt.Sprintf("failed to read container output: %v", readErr) + } else { + output = string(outputBytes) + } + } + if err != nil { + output = output + fmt.Sprintf("\ncontainer exec error: %v\n", err) + } + return code, output +} + +func createObjectBucket(t *testing.T, env *TestEnvironment, bucketName string) { + t.Helper() + + cfg := aws.Config{ + Region: "us-east-1", + Credentials: aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(env.accessKey, env.secretKey, "")), + BaseEndpoint: aws.String(fmt.Sprintf("http://localhost:%d", env.s3Port)), + } + + client := s3.NewFromConfig(cfg, func(o *s3.Options) { + o.UsePathStyle = true + }) + + _, err := client.CreateBucket(context.Background(), &s3.CreateBucketInput{ + Bucket: aws.String(bucketName), + }) + if err != nil { + t.Fatalf("failed to create object bucket %s: %v", bucketName, err) + } +}