diff --git a/test/s3tables/stage_create/iceberg_stage_create_integration_test.go b/test/s3tables/stage_create/iceberg_stage_create_integration_test.go new file mode 100644 index 000000000..5198e7066 --- /dev/null +++ b/test/s3tables/stage_create/iceberg_stage_create_integration_test.go @@ -0,0 +1,181 @@ +package stage_create + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" +) + +type miniEnv struct { + weedBinary string + dataDir string + icebergPort int + masterPort int + masterGrpc int + filerPort int + filerGrpc int + volumePort int + volumeGrpc int + cmd *exec.Cmd + cancel context.CancelFunc +} + +func getFreePort(t *testing.T) int { + t.Helper() + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to allocate free port: %v", err) + } + defer listener.Close() + return listener.Addr().(*net.TCPAddr).Port +} + +func findRepoRoot(t *testing.T) string { + t.Helper() + wd, err := os.Getwd() + if err != nil { + t.Fatalf("failed to get working directory: %v", err) + } + root := wd + for i := 0; i < 8; i++ { + if _, err := os.Stat(filepath.Join(root, "go.mod")); err == nil { + return root + } + root = filepath.Dir(root) + } + t.Fatalf("failed to locate repo root from %s", wd) + return "" +} + +func startMini(t *testing.T) *miniEnv { + t.Helper() + root := findRepoRoot(t) + + weedBinary := filepath.Join(root, "weed", "weed") + if _, err := os.Stat(weedBinary); os.IsNotExist(err) { + weedBinary = "weed" + if _, err := exec.LookPath(weedBinary); err != nil { + t.Skip("weed binary not found; skipping integration test") + } + } + + dataDir, err := os.MkdirTemp("", "seaweed-stage-create-it-*") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + + env := &miniEnv{ + weedBinary: weedBinary, + dataDir: dataDir, + icebergPort: getFreePort(t), + masterPort: getFreePort(t), + masterGrpc: getFreePort(t), + filerPort: getFreePort(t), + filerGrpc: getFreePort(t), + volumePort: getFreePort(t), + volumeGrpc: getFreePort(t), + } + + ctx, cancel := context.WithCancel(context.Background()) + env.cancel = cancel + + env.cmd = exec.CommandContext(ctx, env.weedBinary, "mini", + "-master.port", fmt.Sprintf("%d", env.masterPort), + "-master.port.grpc", fmt.Sprintf("%d", env.masterGrpc), + "-filer.port", fmt.Sprintf("%d", env.filerPort), + "-filer.port.grpc", fmt.Sprintf("%d", env.filerGrpc), + "-volume.port", fmt.Sprintf("%d", env.volumePort), + "-volume.port.grpc", fmt.Sprintf("%d", env.volumeGrpc), + "-s3.port", "0", + "-s3.port.iceberg", fmt.Sprintf("%d", env.icebergPort), + "-ip.bind", "0.0.0.0", + "-dir", env.dataDir, + ) + env.cmd.Stdout = os.Stdout + env.cmd.Stderr = os.Stderr + if err := env.cmd.Start(); err != nil { + t.Fatalf("failed to start weed mini: %v", err) + } + + deadline := time.Now().Add(30 * time.Second) + for time.Now().Before(deadline) { + resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/v1/config", env.icebergPort)) + if err == nil { + resp.Body.Close() + if resp.StatusCode == http.StatusOK { + return env + } + } + time.Sleep(300 * time.Millisecond) + } + + env.cleanup(t) + t.Fatalf("iceberg endpoint not ready") + return nil +} + +func (e *miniEnv) cleanup(t *testing.T) { + t.Helper() + if e.cancel != nil { + e.cancel() + } + if e.cmd != nil { + _, _ = e.cmd.Process.Wait() + } + if e.dataDir != "" { + _ = os.RemoveAll(e.dataDir) + } +} + +func TestStageCreateMissingNameReturnsBadRequest(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + env := startMini(t) + defer env.cleanup(t) + + reqBody := `{"stage-create":true}` + url := fmt.Sprintf("http://127.0.0.1:%d/v1/namespaces/ns1/tables", env.icebergPort) + req, err := http.NewRequest(http.MethodPost, url, strings.NewReader(reqBody)) + if err != nil { + t.Fatalf("failed to build request: %v", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("request failed: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusBadRequest { + body, _ := io.ReadAll(resp.Body) + t.Fatalf("status = %d, want %d, body=%s", resp.StatusCode, http.StatusBadRequest, string(body)) + } + + var decoded map[string]map[string]any + if err := json.NewDecoder(resp.Body).Decode(&decoded); err != nil { + t.Fatalf("failed to decode error response: %v", err) + } + errorObj, ok := decoded["error"] + if !ok { + t.Fatalf("response missing error object: %#v", decoded) + } + if got := errorObj["type"]; got != "BadRequestException" { + t.Fatalf("error.type = %v, want BadRequestException", got) + } + msg, _ := errorObj["message"].(string) + if !strings.Contains(strings.ToLower(msg), "table name is required") { + t.Fatalf("error.message = %v, want it to include %q", errorObj["message"], "table name is required") + } +} diff --git a/weed/s3api/iceberg/iceberg.go b/weed/s3api/iceberg/iceberg.go index f673fe3b8..5ae16b5e5 100644 --- a/weed/s3api/iceberg/iceberg.go +++ b/weed/s3api/iceberg/iceberg.go @@ -322,6 +322,7 @@ type statisticsUpdate struct { } var ErrIncompleteSetStatistics = errors.New("set-statistics requires snapshot-id, statistics-path, file-size-in-bytes, and file-footer-size-in-bytes") +var errTableNameRequired = errors.New("table name is required") type commitAction struct { Action string `json:"action"` @@ -538,6 +539,13 @@ func parseMetadataVersionFromLocation(metadataLocation string) int { return version } +func validateCreateTableRequest(req CreateTableRequest) error { + if req.Name == "" { + return errTableNameRequired + } + return nil +} + // parseNamespace parses the namespace from path parameter. // Iceberg uses unit separator (0x1F) for multi-level namespaces. // Note: mux already decodes URL-encoded path parameters, so we only split by unit separator. @@ -983,8 +991,8 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) { return } - if req.Name == "" { - writeError(w, http.StatusBadRequest, "BadRequestException", "Table name is required") + if err := validateCreateTableRequest(req); err != nil { + writeError(w, http.StatusBadRequest, "BadRequestException", err.Error()) return } diff --git a/weed/s3api/iceberg/iceberg_create_table_test.go b/weed/s3api/iceberg/iceberg_create_table_test.go new file mode 100644 index 000000000..bc945cd5f --- /dev/null +++ b/weed/s3api/iceberg/iceberg_create_table_test.go @@ -0,0 +1,20 @@ +package iceberg + +import ( + "errors" + "testing" +) + +func TestValidateCreateTableRequestRequiresName(t *testing.T) { + err := validateCreateTableRequest(CreateTableRequest{}) + if !errors.Is(err, errTableNameRequired) { + t.Fatalf("validateCreateTableRequest() error = %v, want errTableNameRequired", err) + } +} + +func TestValidateCreateTableRequestAcceptsWithName(t *testing.T) { + err := validateCreateTableRequest(CreateTableRequest{Name: "orders"}) + if err != nil { + t.Fatalf("validateCreateTableRequest() error = %v, want nil", err) + } +}