Browse Source

iceberg: persist and cleanup stage-create markers

pull/8279/head
Chris Lu 1 day ago
parent
commit
6b8442bca6
  1. 137
      weed/s3api/iceberg/iceberg.go
  2. 21
      weed/s3api/iceberg/iceberg_stage_create_helpers_test.go

137
weed/s3api/iceberg/iceberg.go

@ -10,6 +10,7 @@ import (
"fmt"
"math/rand/v2"
"net/http"
"net/url"
"os"
"path"
"strconv"
@ -316,6 +317,118 @@ func (s *Server) loadMetadataFile(ctx context.Context, bucketName, tablePath, me
return content, nil
}
func stageCreateMarkerNamespaceKey(namespace []string) string {
return url.PathEscape(encodeNamespace(namespace))
}
func stageCreateMarkerDir(bucketName string, namespace []string, tableName string) string {
return path.Join(s3tables.TablesPath, bucketName, stageCreateMarkerDirName, stageCreateMarkerNamespaceKey(namespace), tableName)
}
func (s *Server) writeStageCreateMarker(ctx context.Context, bucketName string, namespace []string, tableName string, tableUUID uuid.UUID, location string) error {
opCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
marker := stageCreateMarker{
TableUUID: tableUUID.String(),
Location: location,
CreatedAt: time.Now().UTC().Format(time.RFC3339Nano),
ExpiresAt: time.Now().UTC().Add(stageCreateMarkerTTL).Format(time.RFC3339Nano),
}
content, err := json.Marshal(marker)
if err != nil {
return err
}
return s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
ensureDir := func(parent, name, errorContext string) error {
_, lookupErr := filer_pb.LookupEntry(opCtx, client, &filer_pb.LookupDirectoryEntryRequest{
Directory: parent,
Name: name,
})
if lookupErr == nil {
return nil
}
if lookupErr != filer_pb.ErrNotFound {
return fmt.Errorf("lookup %s failed: %w", errorContext, lookupErr)
}
resp, createErr := client.CreateEntry(opCtx, &filer_pb.CreateEntryRequest{
Directory: parent,
Entry: &filer_pb.Entry{
Name: name,
IsDirectory: true,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(0755 | os.ModeDir),
},
},
})
if createErr != nil {
return fmt.Errorf("failed to create %s: %w", errorContext, createErr)
}
if resp.Error != "" && !strings.Contains(resp.Error, "exist") {
return fmt.Errorf("failed to create %s: %s", errorContext, resp.Error)
}
return nil
}
segments := []string{bucketName, stageCreateMarkerDirName, stageCreateMarkerNamespaceKey(namespace), tableName}
currentDir := s3tables.TablesPath
for _, segment := range segments {
if segment == "" {
continue
}
if err := ensureDir(currentDir, segment, "stage-create marker directory"); err != nil {
return err
}
currentDir = path.Join(currentDir, segment)
}
entryName := tableUUID.String() + ".json"
resp, createErr := client.CreateEntry(opCtx, &filer_pb.CreateEntryRequest{
Directory: currentDir,
Entry: &filer_pb.Entry{
Name: entryName,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(0644),
FileSize: uint64(len(content)),
},
Content: content,
Extended: map[string][]byte{
"Mime-Type": []byte("application/json"),
},
},
})
if createErr != nil {
return createErr
}
if resp.Error != "" {
return errors.New(resp.Error)
}
return nil
})
}
func (s *Server) deleteStageCreateMarkers(ctx context.Context, bucketName string, namespace []string, tableName string) error {
opCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
parentDir := path.Dir(stageCreateMarkerDir(bucketName, namespace, tableName))
targetName := path.Base(stageCreateMarkerDir(bucketName, namespace, tableName))
return s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
err := filer_pb.DoRemove(opCtx, client, parentDir, targetName, true, true, true, false, nil)
if errors.Is(err, filer_pb.ErrNotFound) {
return nil
}
return err
})
}
type statisticsUpdate struct {
set *table.StatisticsFile
remove *int64
@ -324,6 +437,18 @@ type statisticsUpdate struct {
var ErrIncompleteSetStatistics = errors.New("set-statistics requires snapshot-id, statistics-path, file-size-in-bytes, and file-footer-size-in-bytes")
var errTableNameRequired = errors.New("table name is required")
const (
stageCreateMarkerDirName = ".iceberg_staged"
stageCreateMarkerTTL = 24 * time.Hour
)
type stageCreateMarker struct {
TableUUID string `json:"table_uuid"`
Location string `json:"location"`
CreatedAt string `json:"created_at"`
ExpiresAt string `json:"expires_at"`
}
type commitAction struct {
Action string `json:"action"`
}
@ -1060,6 +1185,9 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) {
// Stage-create writes the initial metadata file but does not register table state in S3Tables.
if req.StageCreate {
if markerErr := s.writeStageCreateMarker(r.Context(), metadataBucket, namespace, tableName, tableUUID, location); markerErr != nil {
glog.V(1).Infof("Iceberg: failed to persist stage-create marker for %s.%s: %v", encodeNamespace(namespace), tableName, markerErr)
}
result := LoadTableResult{
MetadataLocation: metadataLocation,
Metadata: metadata,
@ -1140,6 +1268,9 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) {
if finalLocation == "" {
finalLocation = metadataLocation
}
if markerErr := s.deleteStageCreateMarkers(r.Context(), metadataBucket, namespace, tableName); markerErr != nil {
glog.V(1).Infof("Iceberg: failed to cleanup stage-create markers for %s.%s after create: %v", encodeNamespace(namespace), tableName, markerErr)
}
result := LoadTableResult{
MetadataLocation: finalLocation,
@ -1487,6 +1618,9 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to commit table creation: "+createErr.Error())
return
}
if markerErr := s.deleteStageCreateMarkers(r.Context(), metadataBucket, namespace, tableName); markerErr != nil {
glog.V(1).Infof("Iceberg: failed to cleanup stage-create markers for %s.%s after finalize: %v", encodeNamespace(namespace), tableName, markerErr)
}
result := CommitTableResponse{
MetadataLocation: newMetadataLocation,
@ -1578,6 +1712,9 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to commit table creation: "+createErr.Error())
return
}
if markerErr := s.deleteStageCreateMarkers(r.Context(), metadataBucket, namespace, tableName); markerErr != nil {
glog.V(1).Infof("Iceberg: failed to cleanup stage-create markers for %s.%s after finalize: %v", encodeNamespace(namespace), tableName, markerErr)
}
result := CommitTableResponse{
MetadataLocation: newMetadataLocation,

21
weed/s3api/iceberg/iceberg_stage_create_helpers_test.go

@ -1,6 +1,7 @@
package iceberg
import (
"strings"
"testing"
"github.com/apache/iceberg-go/table"
@ -35,3 +36,23 @@ func TestParseMetadataVersionFromLocation(t *testing.T) {
}
}
}
func TestStageCreateMarkerNamespaceKey(t *testing.T) {
key := stageCreateMarkerNamespaceKey([]string{"a", "b"})
if key == "a\x1fb" {
t.Fatalf("stageCreateMarkerNamespaceKey() returned unescaped namespace key %q", key)
}
if !strings.Contains(key, "%1F") {
t.Fatalf("stageCreateMarkerNamespaceKey() = %q, want escaped unit separator", key)
}
}
func TestStageCreateMarkerDir(t *testing.T) {
dir := stageCreateMarkerDir("warehouse", []string{"ns"}, "orders")
if !strings.Contains(dir, stageCreateMarkerDirName) {
t.Fatalf("stageCreateMarkerDir() = %q, want marker dir segment %q", dir, stageCreateMarkerDirName)
}
if !strings.HasSuffix(dir, "/orders") {
t.Fatalf("stageCreateMarkerDir() = %q, want suffix /orders", dir)
}
}
Loading…
Cancel
Save