Browse Source

iceberg: address review comments on create-on-commit and tests

pull/8279/head
Chris Lu 1 day ago
parent
commit
85b08436b3
  1. 16
      test/s3tables/catalog/iceberg_catalog_test.go
  2. 340
      weed/s3api/iceberg/iceberg.go
  3. 8
      weed/s3api/iceberg/iceberg_stage_create_helpers_test.go

16
test/s3tables/catalog/iceberg_catalog_test.go

@ -304,6 +304,15 @@ func TestStageCreateMissingNameReturnsBadRequest(t *testing.T) {
env.StartSeaweedFS(t)
createTableBucket(t, env, "warehouse")
status, _, err := doIcebergJSONRequest(env, http.MethodPost, "/v1/namespaces", map[string]any{
"namespace": []string{"ns1"},
})
if err != nil {
t.Fatalf("Create namespace request failed: %v", err)
}
if status != http.StatusOK && status != http.StatusConflict {
t.Fatalf("Create namespace status = %d, want 200 or 409", status)
}
reqBody := `{"stage-create": true}`
url := fmt.Sprintf("%s/v1/namespaces/%s/tables", env.IcebergURL(), "ns1")
@ -324,13 +333,13 @@ func TestStageCreateMissingNameReturnsBadRequest(t *testing.T) {
t.Fatalf("Expected status 400, got %d: %s", resp.StatusCode, body)
}
var decoded map[string]map[string]any
var decoded map[string]any
if err := json.NewDecoder(resp.Body).Decode(&decoded); err != nil {
t.Fatalf("Failed to decode error response: %v", err)
}
errorObj, ok := decoded["error"]
errorObj, ok := decoded["error"].(map[string]any)
if !ok {
t.Fatalf("Response missing error object: %#v", decoded)
t.Fatalf("Response missing or invalid error object: %#v", decoded)
}
if got := errorObj["type"]; got != "BadRequestException" {
t.Fatalf("error.type = %v, want BadRequestException", got)
@ -456,6 +465,7 @@ func TestCommitMissingTableWithoutAssertCreate(t *testing.T) {
}
}
// doIcebergJSONRequest decodes JSON object responses used by catalog tests.
func doIcebergJSONRequest(env *TestEnvironment, method, path string, payload any) (int, map[string]any, error) {
url := env.IcebergURL() + path

340
weed/s3api/iceberg/iceberg.go

@ -309,6 +309,9 @@ func (s *Server) loadMetadataFile(ctx context.Context, bucketName, tablePath, me
if err != nil {
return err
}
if resp == nil || resp.Entry == nil {
return fmt.Errorf("lookup returned nil entry for %s/%s", metadataDir, metadataFileName)
}
content = append([]byte(nil), resp.Entry.Content...)
return nil
})
@ -502,6 +505,30 @@ type stageCreateMarker struct {
ExpiresAt string `json:"expires_at"`
}
type icebergRequestError struct {
status int
errType string
message string
}
func (e *icebergRequestError) Error() string {
return e.message
}
type createOnCommitInput struct {
bucketARN string
namespace []string
tableName string
identityName string
location string
tableUUID uuid.UUID
baseMetadata table.Metadata
baseMetadataLoc string
baseMetadataVer int
updates table.Updates
statisticsUpdates []statisticsUpdate
}
type commitAction struct {
Action string `json:"action"`
}
@ -717,6 +744,129 @@ func parseMetadataVersionFromLocation(metadataLocation string) int {
return version
}
func (s *Server) finalizeCreateOnCommit(ctx context.Context, input createOnCommitInput) (*CommitTableResponse, *icebergRequestError) {
builder, err := table.MetadataBuilderFromBase(input.baseMetadata, input.baseMetadataLoc)
if err != nil {
return nil, &icebergRequestError{
status: http.StatusInternalServerError,
errType: "InternalServerError",
message: "Failed to create metadata builder: " + err.Error(),
}
}
for _, update := range input.updates {
if err := update.Apply(builder); err != nil {
return nil, &icebergRequestError{
status: http.StatusBadRequest,
errType: "BadRequestException",
message: "Failed to apply update: " + err.Error(),
}
}
}
newMetadata, err := builder.Build()
if err != nil {
return nil, &icebergRequestError{
status: http.StatusBadRequest,
errType: "BadRequestException",
message: "Failed to build new metadata: " + err.Error(),
}
}
metadataVersion := input.baseMetadataVer + 1
if metadataVersion <= 0 {
metadataVersion = 1
}
metadataFileName := fmt.Sprintf("v%d.metadata.json", metadataVersion)
newMetadataLocation := fmt.Sprintf("%s/metadata/%s", strings.TrimSuffix(input.location, "/"), metadataFileName)
metadataBytes, err := json.Marshal(newMetadata)
if err != nil {
return nil, &icebergRequestError{
status: http.StatusInternalServerError,
errType: "InternalServerError",
message: "Failed to serialize metadata: " + err.Error(),
}
}
metadataBytes, err = applyStatisticsUpdates(metadataBytes, input.statisticsUpdates)
if err != nil {
return nil, &icebergRequestError{
status: http.StatusBadRequest,
errType: "BadRequestException",
message: "Failed to apply statistics updates: " + err.Error(),
}
}
newMetadata, err = table.ParseMetadataBytes(metadataBytes)
if err != nil {
return nil, &icebergRequestError{
status: http.StatusInternalServerError,
errType: "InternalServerError",
message: "Failed to parse committed metadata: " + err.Error(),
}
}
metadataBucket, metadataPath, err := parseS3Location(input.location)
if err != nil {
return nil, &icebergRequestError{
status: http.StatusInternalServerError,
errType: "InternalServerError",
message: "Invalid table location: " + err.Error(),
}
}
if err := s.saveMetadataFile(ctx, metadataBucket, metadataPath, metadataFileName, metadataBytes); err != nil {
return nil, &icebergRequestError{
status: http.StatusInternalServerError,
errType: "InternalServerError",
message: "Failed to save metadata file: " + err.Error(),
}
}
createReq := &s3tables.CreateTableRequest{
TableBucketARN: input.bucketARN,
Namespace: input.namespace,
Name: input.tableName,
Format: "ICEBERG",
Metadata: &s3tables.TableMetadata{
Iceberg: &s3tables.IcebergMetadata{
TableUUID: input.tableUUID.String(),
},
FullMetadata: metadataBytes,
},
MetadataVersion: metadataVersion,
MetadataLocation: newMetadataLocation,
}
createErr := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(ctx, mgrClient, "CreateTable", createReq, nil, input.identityName)
})
if createErr != nil {
if cleanupErr := s.deleteMetadataFile(ctx, metadataBucket, metadataPath, metadataFileName); cleanupErr != nil {
glog.V(1).Infof("Iceberg: failed to cleanup metadata file %s after create-on-commit failure: %v", newMetadataLocation, cleanupErr)
}
if isS3TablesConflict(createErr) || isS3TablesAlreadyExists(createErr) {
return nil, &icebergRequestError{
status: http.StatusConflict,
errType: "CommitFailedException",
message: "Table was created concurrently",
}
}
glog.Errorf("Iceberg: CommitTable CreateTable error: %v", createErr)
return nil, &icebergRequestError{
status: http.StatusInternalServerError,
errType: "InternalServerError",
message: "Failed to commit table creation: " + createErr.Error(),
}
}
if markerErr := s.deleteStageCreateMarkers(ctx, metadataBucket, input.namespace, input.tableName); markerErr != nil {
glog.V(1).Infof("Iceberg: failed to cleanup stage-create markers for %s.%s after finalize: %v", encodeNamespace(input.namespace), input.tableName, markerErr)
}
return &CommitTableResponse{
MetadataLocation: newMetadataLocation,
Metadata: newMetadata,
}, nil
}
func validateCreateTableRequest(req CreateTableRequest) error {
if req.Name == "" {
return errTableNameRequired
@ -1590,14 +1740,14 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
tableUUID := generatedLegacyUUID
baseMetadataVersion := 0
baseMetadataLocation := ""
var baseMetadata table.Metadata
stagedFileName := "v1.metadata.json"
metadataBucket, metadataPath, parseLocationErr := parseS3Location(location)
if parseLocationErr != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Invalid table location: "+parseLocationErr.Error())
return
}
stagedFileName := "v1.metadata.json"
stagedMetadataBytes, loadErr := s.loadMetadataFile(r.Context(), metadataBucket, metadataPath, stagedFileName)
if loadErr == nil && len(stagedMetadataBytes) > 0 {
stagedMetadata, parseErr := table.ParseMetadataBytes(stagedMetadataBytes)
@ -1605,191 +1755,43 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse staged metadata: "+parseErr.Error())
return
}
baseMetadata = stagedMetadata
baseMetadataLocation = fmt.Sprintf("%s/metadata/%s", strings.TrimSuffix(location, "/"), stagedFileName)
baseMetadataVersion = parseMetadataVersionFromLocation(baseMetadataLocation)
if stagedMetadata.TableUUID() != uuid.Nil {
tableUUID = stagedMetadata.TableUUID()
}
builder, builderErr := table.MetadataBuilderFromBase(stagedMetadata, baseMetadataLocation)
if builderErr != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to create metadata builder: "+builderErr.Error())
return
}
for _, update := range req.Updates {
if applyErr := update.Apply(builder); applyErr != nil {
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply update: "+applyErr.Error())
return
}
}
newMetadata, buildErr := builder.Build()
if buildErr != nil {
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to build new metadata: "+buildErr.Error())
return
}
metadataVersion := baseMetadataVersion + 1
if metadataVersion <= 0 {
metadataVersion = 1
}
metadataFileName := fmt.Sprintf("v%d.metadata.json", metadataVersion)
newMetadataLocation := fmt.Sprintf("%s/metadata/%s", strings.TrimSuffix(location, "/"), metadataFileName)
metadataBytes, marshalErr := json.Marshal(newMetadata)
if marshalErr != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to serialize metadata: "+marshalErr.Error())
return
}
metadataBytes, marshalErr = applyStatisticsUpdates(metadataBytes, statisticsUpdates)
if marshalErr != nil {
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply statistics updates: "+marshalErr.Error())
return
}
newMetadata, marshalErr = table.ParseMetadataBytes(metadataBytes)
if marshalErr != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse committed metadata: "+marshalErr.Error())
return
}
if saveErr := s.saveMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName, metadataBytes); saveErr != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+saveErr.Error())
return
}
createReq := &s3tables.CreateTableRequest{
TableBucketARN: bucketARN,
Namespace: namespace,
Name: tableName,
Format: "ICEBERG",
Metadata: &s3tables.TableMetadata{
Iceberg: &s3tables.IcebergMetadata{
TableUUID: tableUUID.String(),
},
FullMetadata: metadataBytes,
},
MetadataVersion: metadataVersion,
MetadataLocation: newMetadataLocation,
}
createErr := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "CreateTable", createReq, nil, identityName)
})
if createErr != nil {
if cleanupErr := s.deleteMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName); cleanupErr != nil {
glog.V(1).Infof("Iceberg: failed to cleanup metadata file %s after create-on-commit failure: %v", newMetadataLocation, cleanupErr)
}
if isS3TablesConflict(createErr) || isS3TablesAlreadyExists(createErr) {
writeError(w, http.StatusConflict, "CommitFailedException", "Table was created concurrently")
return
}
glog.Errorf("Iceberg: CommitTable CreateTable error: %v", createErr)
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to commit table creation: "+createErr.Error())
return
}
if markerErr := s.deleteStageCreateMarkers(r.Context(), metadataBucket, namespace, tableName); markerErr != nil {
glog.V(1).Infof("Iceberg: failed to cleanup stage-create markers for %s.%s after finalize: %v", encodeNamespace(namespace), tableName, markerErr)
}
result := CommitTableResponse{
MetadataLocation: newMetadataLocation,
Metadata: newMetadata,
}
writeJSON(w, http.StatusOK, result)
return
}
if loadErr != nil && !errors.Is(loadErr, filer_pb.ErrNotFound) {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to load staged metadata: "+loadErr.Error())
return
}
currentMetadata := newTableMetadata(tableUUID, location, nil, nil, nil, nil)
if currentMetadata == nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to build current metadata")
return
}
builder, builderErr := table.MetadataBuilderFromBase(currentMetadata, baseMetadataLocation)
if builderErr != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to create metadata builder: "+builderErr.Error())
return
}
for _, update := range req.Updates {
if applyErr := update.Apply(builder); applyErr != nil {
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply update: "+applyErr.Error())
if baseMetadata == nil {
baseMetadata = newTableMetadata(tableUUID, location, nil, nil, nil, nil)
if baseMetadata == nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to build current metadata")
return
}
}
newMetadata, buildErr := builder.Build()
if buildErr != nil {
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to build new metadata: "+buildErr.Error())
return
}
metadataVersion := 1
metadataFileName := fmt.Sprintf("v%d.metadata.json", metadataVersion)
newMetadataLocation := fmt.Sprintf("%s/metadata/%s", strings.TrimSuffix(location, "/"), metadataFileName)
metadataBytes, marshalErr := json.Marshal(newMetadata)
if marshalErr != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to serialize metadata: "+marshalErr.Error())
return
}
metadataBytes, marshalErr = applyStatisticsUpdates(metadataBytes, statisticsUpdates)
if marshalErr != nil {
writeError(w, http.StatusBadRequest, "BadRequestException", "Failed to apply statistics updates: "+marshalErr.Error())
return
}
newMetadata, marshalErr = table.ParseMetadataBytes(metadataBytes)
if marshalErr != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to parse committed metadata: "+marshalErr.Error())
return
}
if saveErr := s.saveMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName, metadataBytes); saveErr != nil {
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to save metadata file: "+saveErr.Error())
return
}
createReq := &s3tables.CreateTableRequest{
TableBucketARN: bucketARN,
Namespace: namespace,
Name: tableName,
Format: "ICEBERG",
Metadata: &s3tables.TableMetadata{
Iceberg: &s3tables.IcebergMetadata{
TableUUID: tableUUID.String(),
},
FullMetadata: metadataBytes,
},
MetadataVersion: metadataVersion,
MetadataLocation: newMetadataLocation,
}
createErr := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.tablesManager.Execute(r.Context(), mgrClient, "CreateTable", createReq, nil, identityName)
result, reqErr := s.finalizeCreateOnCommit(r.Context(), createOnCommitInput{
bucketARN: bucketARN,
namespace: namespace,
tableName: tableName,
identityName: identityName,
location: location,
tableUUID: tableUUID,
baseMetadata: baseMetadata,
baseMetadataLoc: baseMetadataLocation,
baseMetadataVer: baseMetadataVersion,
updates: req.Updates,
statisticsUpdates: statisticsUpdates,
})
if createErr != nil {
if cleanupErr := s.deleteMetadataFile(r.Context(), metadataBucket, metadataPath, metadataFileName); cleanupErr != nil {
glog.V(1).Infof("Iceberg: failed to cleanup metadata file %s after create-on-commit failure: %v", newMetadataLocation, cleanupErr)
}
if isS3TablesConflict(createErr) || isS3TablesAlreadyExists(createErr) {
writeError(w, http.StatusConflict, "CommitFailedException", "Table was created concurrently")
return
}
glog.Errorf("Iceberg: CommitTable CreateTable error: %v", createErr)
writeError(w, http.StatusInternalServerError, "InternalServerError", "Failed to commit table creation: "+createErr.Error())
if reqErr != nil {
writeError(w, reqErr.status, reqErr.errType, reqErr.message)
return
}
if markerErr := s.deleteStageCreateMarkers(r.Context(), metadataBucket, namespace, tableName); markerErr != nil {
glog.V(1).Infof("Iceberg: failed to cleanup stage-create markers for %s.%s after finalize: %v", encodeNamespace(namespace), tableName, markerErr)
}
result := CommitTableResponse{
MetadataLocation: newMetadataLocation,
Metadata: newMetadata,
}
writeJSON(w, http.StatusOK, result)
return
}

8
weed/s3api/iceberg/iceberg_stage_create_helpers_test.go

@ -31,9 +31,11 @@ func TestParseMetadataVersionFromLocation(t *testing.T) {
}
for _, tc := range testCases {
if got := parseMetadataVersionFromLocation(tc.location); got != tc.version {
t.Fatalf("parseMetadataVersionFromLocation(%q) = %d, want %d", tc.location, got, tc.version)
}
t.Run(tc.location, func(t *testing.T) {
if got := parseMetadataVersionFromLocation(tc.location); got != tc.version {
t.Errorf("parseMetadataVersionFromLocation(%q) = %d, want %d", tc.location, got, tc.version)
}
})
}
}

Loading…
Cancel
Save