From 6b8442bca674ce1d83ac52acc0eeb68acdaeb156 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 9 Feb 2026 23:35:32 -0800 Subject: [PATCH] iceberg: persist and cleanup stage-create markers --- weed/s3api/iceberg/iceberg.go | 137 ++++++++++++++++++ .../iceberg_stage_create_helpers_test.go | 21 +++ 2 files changed, 158 insertions(+) diff --git a/weed/s3api/iceberg/iceberg.go b/weed/s3api/iceberg/iceberg.go index 5ae16b5e5..495618e0a 100644 --- a/weed/s3api/iceberg/iceberg.go +++ b/weed/s3api/iceberg/iceberg.go @@ -10,6 +10,7 @@ import ( "fmt" "math/rand/v2" "net/http" + "net/url" "os" "path" "strconv" @@ -316,6 +317,118 @@ func (s *Server) loadMetadataFile(ctx context.Context, bucketName, tablePath, me return content, nil } +func stageCreateMarkerNamespaceKey(namespace []string) string { + return url.PathEscape(encodeNamespace(namespace)) +} + +func stageCreateMarkerDir(bucketName string, namespace []string, tableName string) string { + return path.Join(s3tables.TablesPath, bucketName, stageCreateMarkerDirName, stageCreateMarkerNamespaceKey(namespace), tableName) +} + +func (s *Server) writeStageCreateMarker(ctx context.Context, bucketName string, namespace []string, tableName string, tableUUID uuid.UUID, location string) error { + opCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + marker := stageCreateMarker{ + TableUUID: tableUUID.String(), + Location: location, + CreatedAt: time.Now().UTC().Format(time.RFC3339Nano), + ExpiresAt: time.Now().UTC().Add(stageCreateMarkerTTL).Format(time.RFC3339Nano), + } + content, err := json.Marshal(marker) + if err != nil { + return err + } + + return s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + ensureDir := func(parent, name, errorContext string) error { + _, lookupErr := filer_pb.LookupEntry(opCtx, client, &filer_pb.LookupDirectoryEntryRequest{ + Directory: parent, + Name: name, + }) + if lookupErr == nil { + return nil + } + if lookupErr != filer_pb.ErrNotFound { + return fmt.Errorf("lookup %s failed: %w", errorContext, lookupErr) + } + + resp, createErr := client.CreateEntry(opCtx, &filer_pb.CreateEntryRequest{ + Directory: parent, + Entry: &filer_pb.Entry{ + Name: name, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(0755 | os.ModeDir), + }, + }, + }) + if createErr != nil { + return fmt.Errorf("failed to create %s: %w", errorContext, createErr) + } + if resp.Error != "" && !strings.Contains(resp.Error, "exist") { + return fmt.Errorf("failed to create %s: %s", errorContext, resp.Error) + } + return nil + } + + segments := []string{bucketName, stageCreateMarkerDirName, stageCreateMarkerNamespaceKey(namespace), tableName} + currentDir := s3tables.TablesPath + for _, segment := range segments { + if segment == "" { + continue + } + if err := ensureDir(currentDir, segment, "stage-create marker directory"); err != nil { + return err + } + currentDir = path.Join(currentDir, segment) + } + + entryName := tableUUID.String() + ".json" + resp, createErr := client.CreateEntry(opCtx, &filer_pb.CreateEntryRequest{ + Directory: currentDir, + Entry: &filer_pb.Entry{ + Name: entryName, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(0644), + FileSize: uint64(len(content)), + }, + Content: content, + Extended: map[string][]byte{ + "Mime-Type": []byte("application/json"), + }, + }, + }) + if createErr != nil { + return createErr + } + if resp.Error != "" { + return errors.New(resp.Error) + } + return nil + }) +} + +func (s *Server) deleteStageCreateMarkers(ctx context.Context, bucketName string, namespace []string, tableName string) error { + opCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + parentDir := path.Dir(stageCreateMarkerDir(bucketName, namespace, tableName)) + targetName := path.Base(stageCreateMarkerDir(bucketName, namespace, tableName)) + + return s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + err := filer_pb.DoRemove(opCtx, client, parentDir, targetName, true, true, true, false, nil) + if errors.Is(err, filer_pb.ErrNotFound) { + return nil + } + return err + }) +} + type statisticsUpdate struct { set *table.StatisticsFile remove *int64 @@ -324,6 +437,18 @@ type statisticsUpdate struct { var ErrIncompleteSetStatistics = errors.New("set-statistics requires snapshot-id, statistics-path, file-size-in-bytes, and file-footer-size-in-bytes") var errTableNameRequired = errors.New("table name is required") +const ( + stageCreateMarkerDirName = ".iceberg_staged" + stageCreateMarkerTTL = 24 * time.Hour +) + +type stageCreateMarker struct { + TableUUID string `json:"table_uuid"` + Location string `json:"location"` + CreatedAt string `json:"created_at"` + ExpiresAt string `json:"expires_at"` +} + type commitAction struct { Action string `json:"action"` } @@ -1060,6 +1185,9 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) { // Stage-create writes the initial metadata file but does not register table state in S3Tables. if req.StageCreate { + if markerErr := s.writeStageCreateMarker(r.Context(), metadataBucket, namespace, tableName, tableUUID, location); markerErr != nil { + glog.V(1).Infof("Iceberg: failed to persist stage-create marker for %s.%s: %v", encodeNamespace(namespace), tableName, markerErr) + } result := LoadTableResult{ MetadataLocation: metadataLocation, Metadata: metadata, @@ -1140,6 +1268,9 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) { if finalLocation == "" { finalLocation = metadataLocation } + if markerErr := s.deleteStageCreateMarkers(r.Context(), metadataBucket, namespace, tableName); markerErr != nil { + glog.V(1).Infof("Iceberg: failed to cleanup stage-create markers for %s.%s after create: %v", encodeNamespace(namespace), tableName, markerErr) + } result := LoadTableResult{ MetadataLocation: finalLocation, @@ -1487,6 +1618,9 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to commit table creation: "+createErr.Error()) return } + if markerErr := s.deleteStageCreateMarkers(r.Context(), metadataBucket, namespace, tableName); markerErr != nil { + glog.V(1).Infof("Iceberg: failed to cleanup stage-create markers for %s.%s after finalize: %v", encodeNamespace(namespace), tableName, markerErr) + } result := CommitTableResponse{ MetadataLocation: newMetadataLocation, @@ -1578,6 +1712,9 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to commit table creation: "+createErr.Error()) return } + if markerErr := s.deleteStageCreateMarkers(r.Context(), metadataBucket, namespace, tableName); markerErr != nil { + glog.V(1).Infof("Iceberg: failed to cleanup stage-create markers for %s.%s after finalize: %v", encodeNamespace(namespace), tableName, markerErr) + } result := CommitTableResponse{ MetadataLocation: newMetadataLocation, diff --git a/weed/s3api/iceberg/iceberg_stage_create_helpers_test.go b/weed/s3api/iceberg/iceberg_stage_create_helpers_test.go index 2fae43f60..d44910fe3 100644 --- a/weed/s3api/iceberg/iceberg_stage_create_helpers_test.go +++ b/weed/s3api/iceberg/iceberg_stage_create_helpers_test.go @@ -1,6 +1,7 @@ package iceberg import ( + "strings" "testing" "github.com/apache/iceberg-go/table" @@ -35,3 +36,23 @@ func TestParseMetadataVersionFromLocation(t *testing.T) { } } } + +func TestStageCreateMarkerNamespaceKey(t *testing.T) { + key := stageCreateMarkerNamespaceKey([]string{"a", "b"}) + if key == "a\x1fb" { + t.Fatalf("stageCreateMarkerNamespaceKey() returned unescaped namespace key %q", key) + } + if !strings.Contains(key, "%1F") { + t.Fatalf("stageCreateMarkerNamespaceKey() = %q, want escaped unit separator", key) + } +} + +func TestStageCreateMarkerDir(t *testing.T) { + dir := stageCreateMarkerDir("warehouse", []string{"ns"}, "orders") + if !strings.Contains(dir, stageCreateMarkerDirName) { + t.Fatalf("stageCreateMarkerDir() = %q, want marker dir segment %q", dir, stageCreateMarkerDirName) + } + if !strings.HasSuffix(dir, "/orders") { + t.Fatalf("stageCreateMarkerDir() = %q, want suffix /orders", dir) + } +}