Browse Source

iceberg: add stage-create rollout flag and marker pruning

pull/8279/head
Chris Lu 1 day ago
parent
commit
551da8274c
  1. 71
      weed/s3api/iceberg/iceberg.go
  2. 17
      weed/s3api/iceberg/iceberg_create_table_test.go

71
weed/s3api/iceberg/iceberg.go

@ -8,6 +8,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"math/rand/v2"
"net/http"
"net/url"
@ -325,6 +326,53 @@ func stageCreateMarkerDir(bucketName string, namespace []string, tableName strin
return path.Join(s3tables.TablesPath, bucketName, stageCreateMarkerDirName, stageCreateMarkerNamespaceKey(namespace), tableName)
}
func (s *Server) pruneExpiredStageCreateMarkers(ctx context.Context, bucketName string, namespace []string, tableName string) error {
opCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
markerDir := stageCreateMarkerDir(bucketName, namespace, tableName)
now := time.Now().UTC()
return s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
stream, err := client.ListEntries(opCtx, &filer_pb.ListEntriesRequest{
Directory: markerDir,
Limit: 1024,
})
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) || strings.Contains(strings.ToLower(err.Error()), "not found") {
return nil
}
return err
}
for {
resp, recvErr := stream.Recv()
if recvErr == io.EOF {
return nil
}
if recvErr != nil {
return recvErr
}
if resp.Entry == nil || resp.Entry.IsDirectory || len(resp.Entry.Content) == 0 {
continue
}
var marker stageCreateMarker
if err := json.Unmarshal(resp.Entry.Content, &marker); err != nil {
continue
}
expiresAt, err := time.Parse(time.RFC3339Nano, marker.ExpiresAt)
if err != nil || !expiresAt.Before(now) {
continue
}
if err := filer_pb.DoRemove(opCtx, client, markerDir, resp.Entry.Name, true, false, true, false, nil); err != nil && !errors.Is(err, filer_pb.ErrNotFound) {
return err
}
}
})
}
func (s *Server) writeStageCreateMarker(ctx context.Context, bucketName string, namespace []string, tableName string, tableUUID uuid.UUID, location string) error {
opCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
@ -340,6 +388,10 @@ func (s *Server) writeStageCreateMarker(ctx context.Context, bucketName string,
return err
}
if err := s.pruneExpiredStageCreateMarkers(ctx, bucketName, namespace, tableName); err != nil {
return err
}
return s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
ensureDir := func(parent, name, errorContext string) error {
_, lookupErr := filer_pb.LookupEntry(opCtx, client, &filer_pb.LookupDirectoryEntryRequest{
@ -671,6 +723,19 @@ func validateCreateTableRequest(req CreateTableRequest) error {
return nil
}
func isStageCreateEnabled() bool {
value := strings.TrimSpace(strings.ToLower(os.Getenv("ICEBERG_ENABLE_STAGE_CREATE")))
if value == "" {
return true
}
switch value {
case "0", "false", "no", "off":
return false
default:
return true
}
}
// parseNamespace parses the namespace from path parameter.
// Iceberg uses unit separator (0x1F) for multi-level namespaces.
// Note: mux already decodes URL-encoded path parameters, so we only split by unit separator.
@ -1120,6 +1185,10 @@ func (s *Server) handleCreateTable(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusBadRequest, "BadRequestException", err.Error())
return
}
if req.StageCreate && !isStageCreateEnabled() {
writeError(w, http.StatusNotImplemented, "NotImplementedException", "stage-create is disabled")
return
}
bucketName := getBucketFromPrefix(r)
bucketARN := buildTableBucketARN(bucketName)
@ -1489,7 +1558,7 @@ func (s *Server) handleUpdateTable(w http.ResponseWriter, r *http.Request) {
maxCommitAttempts := 3
generatedLegacyUUID := uuid.New()
canCreateOnCommit := hasAssertCreateRequirement(req.Requirements)
canCreateOnCommit := isStageCreateEnabled() && hasAssertCreateRequirement(req.Requirements)
for attempt := 1; attempt <= maxCommitAttempts; attempt++ {
getReq := &s3tables.GetTableRequest{
TableBucketARN: bucketARN,

17
weed/s3api/iceberg/iceberg_create_table_test.go

@ -18,3 +18,20 @@ func TestValidateCreateTableRequestAcceptsWithName(t *testing.T) {
t.Fatalf("validateCreateTableRequest() error = %v, want nil", err)
}
}
func TestIsStageCreateEnabledDefaultsToTrue(t *testing.T) {
t.Setenv("ICEBERG_ENABLE_STAGE_CREATE", "")
if !isStageCreateEnabled() {
t.Fatalf("isStageCreateEnabled() = false, want true")
}
}
func TestIsStageCreateEnabledFalseValues(t *testing.T) {
falseValues := []string{"0", "false", "FALSE", "no", "off"}
for _, value := range falseValues {
t.Setenv("ICEBERG_ENABLE_STAGE_CREATE", value)
if isStageCreateEnabled() {
t.Fatalf("isStageCreateEnabled() = true for value %q, want false", value)
}
}
}
Loading…
Cancel
Save