Browse Source

iceberg: implement stage-create and create-on-commit finalize

pull/8279/head
Chris Lu 2 days ago
parent
commit
6c82219240
  1. 322
      weed/s3api/iceberg/iceberg.go
  2. 37
      weed/s3api/iceberg/iceberg_stage_create_helpers_test.go

322
weed/s3api/iceberg/iceberg.go

@ -288,6 +288,34 @@ func (s *Server) deleteMetadataFile(ctx context.Context, bucketName, tablePath,
}) })
} }
func (s *Server) loadMetadataFile(ctx context.Context, bucketName, tablePath, metadataFileName string) ([]byte, error) {
opCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
metadataDir := path.Join(s3tables.TablesPath, bucketName)
if tablePath != "" {
metadataDir = path.Join(metadataDir, tablePath)
}
metadataDir = path.Join(metadataDir, "metadata")
var content []byte
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := filer_pb.LookupEntry(opCtx, client, &filer_pb.LookupDirectoryEntryRequest{
Directory: metadataDir,
Name: metadataFileName,
})
if err != nil {
return err
}
content = append([]byte(nil), resp.Entry.Content...)
return nil
})
if err != nil {
return nil, err
}
return content, nil
}
type statisticsUpdate struct { type statisticsUpdate struct {
set *table.StatisticsFile set *table.StatisticsFile
remove *int64 remove *int64
@ -464,6 +492,52 @@ func isS3TablesConflict(err error) bool {
return errors.As(err, &tableErr) && tableErr.Type == s3tables.ErrCodeConflict return errors.As(err, &tableErr) && tableErr.Type == s3tables.ErrCodeConflict
} }
func isS3TablesNotFound(err error) bool {
if err == nil {
return false
}
if strings.Contains(strings.ToLower(err.Error()), "not found") {
return true
}
var tableErr *s3tables.S3TablesError
return errors.As(err, &tableErr) &&
(tableErr.Type == s3tables.ErrCodeNoSuchTable || tableErr.Type == s3tables.ErrCodeNoSuchNamespace || strings.Contains(strings.ToLower(tableErr.Message), "not found"))
}
func hasAssertCreateRequirement(requirements table.Requirements) bool {
for _, requirement := range requirements {
if requirement.GetType() == requirementAssertCreate {
return true
}
}
return false
}
func isS3TablesAlreadyExists(err error) bool {
if err == nil {
return false
}
if strings.Contains(strings.ToLower(err.Error()), "already exists") {
return true
}
var tableErr *s3tables.S3TablesError
return errors.As(err, &tableErr) &&
(tableErr.Type == s3tables.ErrCodeTableAlreadyExists || tableErr.Type == s3tables.ErrCodeNamespaceAlreadyExists || strings.Contains(strings.ToLower(tableErr.Message), "already exists"))
}
func parseMetadataVersionFromLocation(metadataLocation string) int {
base := path.Base(metadataLocation)
if !strings.HasPrefix(base, "v") || !strings.HasSuffix(base, ".metadata.json") {
return 0
}
rawVersion := strings.TrimPrefix(strings.TrimSuffix(base, ".metadata.json"), "v")
version, err := strconv.Atoi(rawVersion)
if err != nil || version <= 0 {
return 0
}
return version
}
// parseNamespace parses the namespace from path parameter. // parseNamespace parses the namespace from path parameter.
// Iceberg uses unit separator (0x1F) for multi-level namespaces. // Iceberg uses unit separator (0x1F) for multi-level namespaces.
// Note: mux already decodes URL-encoded path parameters, so we only split by unit separator. // Note: mux already decodes URL-encoded path parameters, so we only split by unit separator.
@ -562,8 +636,9 @@ func buildTableBucketARN(bucketName string) string {
} }
const ( const (
defaultListPageSize = 1000
maxListPageSize = 1000
defaultListPageSize = 1000
maxListPageSize = 1000
requirementAssertCreate = "assert-create"
) )
func getPaginationQueryParam(r *http.Request, primary, fallback string) string { func getPaginationQueryParam(r *http.Request, primary, fallback string) string {
@ -965,17 +1040,25 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) {
tableName := req.Name tableName := req.Name
metadataFileName := "v1.metadata.json" // Initial version is always 1 metadataFileName := "v1.metadata.json" // Initial version is always 1
metadataLocation := fmt.Sprintf("%s/metadata/%s", location, metadataFileName) metadataLocation := fmt.Sprintf("%s/metadata/%s", location, metadataFileName)
if !req.StageCreate {
// Save metadata file to filer for immediate table creation.
metadataBucket, metadataPath, err := parseS3Location(location)
if err != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Invalid table location: "+err.Error())
return
}
if err := s.saveMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName, metadataBytes); err != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+err.Error())
return
metadataBucket, metadataPath, err := parseS3Location(location)
if err != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Invalid table location: "+err.Error())
return
}
if err := s.saveMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName, metadataBytes); err != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+err.Error())
return
}
// Stage-create writes the initial metadata file but does not register table state in S3Tables.
if req.StageCreate {
result := LoadTableResult{
MetadataLocation: metadataLocation,
Metadata: metadata,
Config: make(iceberg.Properties),
} }
writeJSON(w, http.StatusOK, result)
return
} }
// Use S3 Tables manager to create table // Use S3 Tables manager to create table
@ -1267,6 +1350,7 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
maxCommitAttempts := 3 maxCommitAttempts := 3
generatedLegacyUUID := uuid.New() generatedLegacyUUID := uuid.New()
canCreateOnCommit := hasAssertCreateRequirement(req.Requirements)
for attempt := 1; attempt <= maxCommitAttempts; attempt++ { for attempt := 1; attempt <= maxCommitAttempts; attempt++ {
getReq := &s3tables.GetTableRequest{ getReq := &s3tables.GetTableRequest{
TableBucketARN: bucketARN, TableBucketARN: bucketARN,
@ -1280,8 +1364,218 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName) return s.tablesManager.Execute(r.Context(), mgrClient, "GetTable", getReq, &getResp, identityName)
}) })
if err != nil { if err != nil {
if strings.Contains(err.Error(), "not found") {
writeError(w, http.StatusNotFound, "NoSuchTableException", fmt.Sprintf("Table does not exist: %s", tableName))
if isS3TablesNotFound(err) {
if !canCreateOnCommit {
writeError(w, http.StatusNotFound, "NoSuchTableException", fmt.Sprintf("Table does not exist: %s", tableName))
return
}
for _, requirement := range req.Requirements {
if requirementErr := requirement.Validate(nil); requirementErr != nil {
writeError(w, http.StatusConflict, "CommitFailedException", "Requirement failed: "+requirementErr.Error())
return
}
}
location := fmt.Sprintf("s3://%s/%s/%s", bucketName, encodeNamespace(namespace), tableName)
tableUUID := generatedLegacyUUID
baseMetadataVersion := 0
baseMetadataLocation := ""
metadataBucket, metadataPath, parseLocationErr := parseS3Location(location)
if parseLocationErr != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Invalid table location: "+parseLocationErr.Error())
return
}
stagedFileName := "v1.metadata.json"
stagedMetadataBytes, loadErr := s.loadMetadataFile(r.Context(), metadataBucket, metadataPath, stagedFileName)
if loadErr == nil && len(stagedMetadataBytes) > 0 {
stagedMetadata, parseErr := table.ParseMetadataBytes(stagedMetadataBytes)
if parseErr != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse staged metadata: "+parseErr.Error())
return
}
baseMetadataLocation = fmt.Sprintf("%s/metadata/%s", strings.TrimSuffix(location, "/"), stagedFileName)
baseMetadataVersion = parseMetadataVersionFromLocation(baseMetadataLocation)
if stagedMetadata.TableUUID() != uuid.Nil {
tableUUID = stagedMetadata.TableUUID()
}
builder, builderErr := table.MetadataBuilderFromBase(stagedMetadata, baseMetadataLocation)
if builderErr != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to create metadata builder: "+builderErr.Error())
return
}
for _, update := range req.Updates {
if applyErr := update.Apply(builder); applyErr != nil {
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply update: "+applyErr.Error())
return
}
}
newMetadata, buildErr := builder.Build()
if buildErr != nil {
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to build new metadata: "+buildErr.Error())
return
}
metadataVersion := baseMetadataVersion + 1
if metadataVersion <= 0 {
metadataVersion = 1
}
metadataFileName := fmt.Sprintf("v%d.metadata.json", metadataVersion)
newMetadataLocation := fmt.Sprintf("%s/metadata/%s", strings.TrimSuffix(location, "/"), metadataFileName)
metadataBytes, marshalErr := json.Marshal(newMetadata)
if marshalErr != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to serialize metadata: "+marshalErr.Error())
return
}
metadataBytes, marshalErr = applyStatisticsUpdates(metadataBytes, statisticsUpdates)
if marshalErr != nil {
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply statistics updates: "+marshalErr.Error())
return
}
newMetadata, marshalErr = table.ParseMetadataBytes(metadataBytes)
if marshalErr != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse committed metadata: "+marshalErr.Error())
return
}
if saveErr := s.saveMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName, metadataBytes); saveErr != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+saveErr.Error())
return
}
createReq := &s3tables.CreateTableRequest{
TableBucketARN: bucketARN,
Namespace: namespace,
Name: tableName,
Format: "ICEBERG",
Metadata: &s3tables.TableMetadata{
Iceberg: &s3tables.IcebergMetadata{
TableUUID: tableUUID.String(),
},
FullMetadata: metadataBytes,
},
MetadataVersion: metadataVersion,
MetadataLocation: newMetadataLocation,
}
createErr := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "CreateTable", createReq, nil, identityName)
})
if createErr != nil {
if cleanupErr := s.deleteMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName); cleanupErr != nil {
glog.V(1).Infof("Iceberg: failed to cleanup metadata file %s after create-on-commit failure: %v", newMetadataLocation, cleanupErr)
}
if isS3TablesConflict(createErr) || isS3TablesAlreadyExists(createErr) {
writeError(w, http.StatusConflict, "CommitFailedException", "Table was created concurrently")
return
}
glog.Errorf("Iceberg: CommitTable CreateTable error: %v", createErr)
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to commit table creation: "+createErr.Error())
return
}
result := CommitTableResponse{
MetadataLocation: newMetadataLocation,
Metadata: newMetadata,
}
writeJSON(w, http.StatusOK, result)
return
}
if loadErr != nil && !errors.Is(loadErr, filer_pb.ErrNotFound) {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to load staged metadata: "+loadErr.Error())
return
}
currentMetadata := newTableMetadata(tableUUID, location, nil, nil, nil, nil)
if currentMetadata == nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to build current metadata")
return
}
builder, builderErr := table.MetadataBuilderFromBase(currentMetadata, baseMetadataLocation)
if builderErr != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to create metadata builder: "+builderErr.Error())
return
}
for _, update := range req.Updates {
if applyErr := update.Apply(builder); applyErr != nil {
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply update: "+applyErr.Error())
return
}
}
newMetadata, buildErr := builder.Build()
if buildErr != nil {
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to build new metadata: "+buildErr.Error())
return
}
metadataVersion := 1
metadataFileName := fmt.Sprintf("v%d.metadata.json", metadataVersion)
newMetadataLocation := fmt.Sprintf("%s/metadata/%s", strings.TrimSuffix(location, "/"), metadataFileName)
metadataBytes, marshalErr := json.Marshal(newMetadata)
if marshalErr != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to serialize metadata: "+marshalErr.Error())
return
}
metadataBytes, marshalErr = applyStatisticsUpdates(metadataBytes, statisticsUpdates)
if marshalErr != nil {
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply statistics updates: "+marshalErr.Error())
return
}
newMetadata, marshalErr = table.ParseMetadataBytes(metadataBytes)
if marshalErr != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse committed metadata: "+marshalErr.Error())
return
}
if saveErr := s.saveMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName, metadataBytes); saveErr != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+saveErr.Error())
return
}
createReq := &s3tables.CreateTableRequest{
TableBucketARN: bucketARN,
Namespace: namespace,
Name: tableName,
Format: "ICEBERG",
Metadata: &s3tables.TableMetadata{
Iceberg: &s3tables.IcebergMetadata{
TableUUID: tableUUID.String(),
},
FullMetadata: metadataBytes,
},
MetadataVersion: metadataVersion,
MetadataLocation: newMetadataLocation,
}
createErr := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "CreateTable", createReq, nil, identityName)
})
if createErr != nil {
if cleanupErr := s.deleteMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName); cleanupErr != nil {
glog.V(1).Infof("Iceberg: failed to cleanup metadata file %s after create-on-commit failure: %v", newMetadataLocation, cleanupErr)
}
if isS3TablesConflict(createErr) || isS3TablesAlreadyExists(createErr) {
writeError(w, http.StatusConflict, "CommitFailedException", "Table was created concurrently")
return
}
glog.Errorf("Iceberg: CommitTable CreateTable error: %v", createErr)
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to commit table creation: "+createErr.Error())
return
}
result := CommitTableResponse{
MetadataLocation: newMetadataLocation,
Metadata: newMetadata,
}
writeJSON(w, http.StatusOK, result)
return return
} }
glog.V(1).Infof("Iceberg: CommitTable GetTable error: %v", err) glog.V(1).Infof("Iceberg: CommitTable GetTable error: %v", err)

37
weed/s3api/iceberg/iceberg_stage_create_helpers_test.go

@ -0,0 +1,37 @@
package iceberg
import (
"testing"
"github.com/apache/iceberg-go/table"
)
func TestHasAssertCreateRequirement(t *testing.T) {
requirements := table.Requirements{table.AssertCreate()}
if !hasAssertCreateRequirement(requirements) {
t.Fatalf("hasAssertCreateRequirement() = false, want true")
}
requirements = table.Requirements{table.AssertDefaultSortOrderID(0)}
if hasAssertCreateRequirement(requirements) {
t.Fatalf("hasAssertCreateRequirement() = true, want false")
}
}
func TestParseMetadataVersionFromLocation(t *testing.T) {
testCases := []struct {
location string
version int
}{
{location: "s3://b/ns/t/metadata/v1.metadata.json", version: 1},
{location: "s3://b/ns/t/metadata/v25.metadata.json", version: 25},
{location: "s3://b/ns/t/metadata/current.json", version: 0},
{location: "", version: 0},
}
for _, tc := range testCases {
if got := parseMetadataVersionFromLocation(tc.location); got != tc.version {
t.Fatalf("parseMetadataVersionFromLocation(%q) = %d, want %d", tc.location, got, tc.version)
}
}
}
Loading…
Cancel
Save