From 551da8274c18017e314727f5d8b7dc2424e45016 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 9 Feb 2026 23:37:20 -0800 Subject: [PATCH] iceberg: add stage-create rollout flag and marker pruning --- weed/s3api/iceberg/iceberg.go | 71 ++++++++++++++++++- .../iceberg/iceberg_create_table_test.go | 17 +++++ 2 files changed, 87 insertions(+), 1 deletion(-) diff --git a/weed/s3api/iceberg/iceberg.go b/weed/s3api/iceberg/iceberg.go index 495618e0a..067ed0842 100644 --- a/weed/s3api/iceberg/iceberg.go +++ b/weed/s3api/iceberg/iceberg.go @@ -8,6 +8,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "math/rand/v2" "net/http" "net/url" @@ -325,6 +326,53 @@ func stageCreateMarkerDir(bucketName string, namespace []string, tableName strin return path.Join(s3tables.TablesPath, bucketName, stageCreateMarkerDirName, stageCreateMarkerNamespaceKey(namespace), tableName) } +func (s *Server) pruneExpiredStageCreateMarkers(ctx context.Context, bucketName string, namespace []string, tableName string) error { + opCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + markerDir := stageCreateMarkerDir(bucketName, namespace, tableName) + now := time.Now().UTC() + + return s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + stream, err := client.ListEntries(opCtx, &filer_pb.ListEntriesRequest{ + Directory: markerDir, + Limit: 1024, + }) + if err != nil { + if errors.Is(err, filer_pb.ErrNotFound) || strings.Contains(strings.ToLower(err.Error()), "not found") { + return nil + } + return err + } + + for { + resp, recvErr := stream.Recv() + if recvErr == io.EOF { + return nil + } + if recvErr != nil { + return recvErr + } + if resp.Entry == nil || resp.Entry.IsDirectory || len(resp.Entry.Content) == 0 { + continue + } + + var marker stageCreateMarker + if err := json.Unmarshal(resp.Entry.Content, &marker); err != nil { + continue + } + expiresAt, err := time.Parse(time.RFC3339Nano, marker.ExpiresAt) + if err != nil || !expiresAt.Before(now) { + continue + } + + if err := filer_pb.DoRemove(opCtx, client, markerDir, resp.Entry.Name, true, false, true, false, nil); err != nil && !errors.Is(err, filer_pb.ErrNotFound) { + return err + } + } + }) +} + func (s *Server) writeStageCreateMarker(ctx context.Context, bucketName string, namespace []string, tableName string, tableUUID uuid.UUID, location string) error { opCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() @@ -340,6 +388,10 @@ func (s *Server) writeStageCreateMarker(ctx context.Context, bucketName string, return err } + if err := s.pruneExpiredStageCreateMarkers(ctx, bucketName, namespace, tableName); err != nil { + return err + } + return s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { ensureDir := func(parent, name, errorContext string) error { _, lookupErr := filer_pb.LookupEntry(opCtx, client, &filer_pb.LookupDirectoryEntryRequest{ @@ -671,6 +723,19 @@ func validateCreateTableRequest(req CreateTableRequest) error { return nil } +func isStageCreateEnabled() bool { + value := strings.TrimSpace(strings.ToLower(os.Getenv("ICEBERG_ENABLE_STAGE_CREATE"))) + if value == "" { + return true + } + switch value { + case "0", "false", "no", "off": + return false + default: + return true + } +} + // parseNamespace parses the namespace from path parameter. // Iceberg uses unit separator (0x1F) for multi-level namespaces. // Note: mux already decodes URL-encoded path parameters, so we only split by unit separator. @@ -1120,6 +1185,10 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusBadRequest, "BadRequestException", err.Error()) return } + if req.StageCreate && !isStageCreateEnabled() { + writeError(w, http.StatusNotImplemented, "NotImplementedException", "stage-create is disabled") + return + } bucketName := getBucketFromPrefix(r) bucketARN := buildTableBucketARN(bucketName) @@ -1489,7 +1558,7 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { maxCommitAttempts := 3 generatedLegacyUUID := uuid.New() - canCreateOnCommit := hasAssertCreateRequirement(req.Requirements) + canCreateOnCommit := isStageCreateEnabled() && hasAssertCreateRequirement(req.Requirements) for attempt := 1; attempt <= maxCommitAttempts; attempt++ { getReq := &s3tables.GetTableRequest{ TableBucketARN: bucketARN, diff --git a/weed/s3api/iceberg/iceberg_create_table_test.go b/weed/s3api/iceberg/iceberg_create_table_test.go index bc945cd5f..78e1234e1 100644 --- a/weed/s3api/iceberg/iceberg_create_table_test.go +++ b/weed/s3api/iceberg/iceberg_create_table_test.go @@ -18,3 +18,20 @@ func TestValidateCreateTableRequestAcceptsWithName(t *testing.T) { t.Fatalf("validateCreateTableRequest() error = %v, want nil", err) } } + +func TestIsStageCreateEnabledDefaultsToTrue(t *testing.T) { + t.Setenv("ICEBERG_ENABLE_STAGE_CREATE", "") + if !isStageCreateEnabled() { + t.Fatalf("isStageCreateEnabled() = false, want true") + } +} + +func TestIsStageCreateEnabledFalseValues(t *testing.T) { + falseValues := []string{"0", "false", "FALSE", "no", "off"} + for _, value := range falseValues { + t.Setenv("ICEBERG_ENABLE_STAGE_CREATE", value) + if isStageCreateEnabled() { + t.Fatalf("isStageCreateEnabled() = true for value %q, want false", value) + } + } +}