diff --git a/weed/credential/credential_manager.go b/weed/credential/credential_manager.go index 01f4a744d..cd912a5b2 100644 --- a/weed/credential/credential_manager.go +++ b/weed/credential/credential_manager.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/policy_engine" "github.com/seaweedfs/seaweedfs/weed/util" @@ -12,6 +13,11 @@ import ( "google.golang.org/grpc" ) +// FilerAddressSetter is an interface for credential stores that need a dynamic filer address +type FilerAddressSetter interface { + SetFilerAddressFunc(getFiler func() pb.ServerAddress, grpcDialOption grpc.DialOption) +} + // CredentialManager manages user credentials using a configurable store type CredentialManager struct { store CredentialStore @@ -44,11 +50,17 @@ func NewCredentialManager(storeName CredentialStoreTypeName, configuration util. }, nil } -// SetMasterClient sets the master client to enable propagation of changes to S3 servers func (cm *CredentialManager) SetMasterClient(masterClient *wdclient.MasterClient, grpcDialOption grpc.DialOption) { cm.store = NewPropagatingCredentialStore(cm.store, masterClient, grpcDialOption) } +// SetFilerAddressFunc sets the function to get the current filer address +func (cm *CredentialManager) SetFilerAddressFunc(getFiler func() pb.ServerAddress, grpcDialOption grpc.DialOption) { + if s, ok := cm.store.(FilerAddressSetter); ok { + s.SetFilerAddressFunc(getFiler, grpcDialOption) + } +} + // GetStore returns the underlying credential store func (cm *CredentialManager) GetStore() CredentialStore { return cm.store diff --git a/weed/credential/propagating_store.go b/weed/credential/propagating_store.go index 68e1c730d..333c781b6 100644 --- a/weed/credential/propagating_store.go +++ b/weed/credential/propagating_store.go @@ -34,6 +34,12 @@ func NewPropagatingCredentialStore(upstream CredentialStore, masterClient *wdcli } } +func (s *PropagatingCredentialStore) SetFilerAddressFunc(getFiler func() pb.ServerAddress, grpcDialOption grpc.DialOption) { + if setter, ok := s.CredentialStore.(FilerAddressSetter); ok { + setter.SetFilerAddressFunc(getFiler, grpcDialOption) + } +} + func (s *PropagatingCredentialStore) propagateChange(ctx context.Context, fn func(context.Context, s3_pb.SeaweedS3IamCacheClient) error) { if s.masterClient == nil { return diff --git a/weed/s3api/iceberg/iceberg.go b/weed/s3api/iceberg/iceberg.go index 7b80482d8..8122b4627 100644 --- a/weed/s3api/iceberg/iceberg.go +++ b/weed/s3api/iceberg/iceberg.go @@ -164,50 +164,54 @@ func (s *Server) saveMetadataFile(ctx context.Context, bucketName, namespace, ta defer cancel() return s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - // 1. Ensure table directory exists: /table-buckets/// - tableDir := fmt.Sprintf("/table-buckets/%s/%s/%s", bucketName, namespace, tableName) - resp, err := client.CreateEntry(opCtx, &filer_pb.CreateEntryRequest{ - Directory: fmt.Sprintf("/table-buckets/%s/%s", bucketName, namespace), - Entry: &filer_pb.Entry{ - Name: tableName, - IsDirectory: true, - Attributes: &filer_pb.FuseAttributes{ - Mtime: time.Now().Unix(), - Crtime: time.Now().Unix(), - FileMode: uint32(0755 | os.ModeDir), + ensureDir := func(parent, name, errorContext string) error { + _, err := filer_pb.LookupEntry(opCtx, client, &filer_pb.LookupDirectoryEntryRequest{ + Directory: parent, + Name: name, + }) + if err == nil { + return nil + } + if err != filer_pb.ErrNotFound { + return fmt.Errorf("lookup %s failed: %w", errorContext, err) + } + + // If lookup fails with ErrNotFound, try to create the directory. + resp, createErr := client.CreateEntry(opCtx, &filer_pb.CreateEntryRequest{ + Directory: parent, + Entry: &filer_pb.Entry{ + Name: name, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(0755 | os.ModeDir), + }, }, - }, - }) - if err != nil { - return fmt.Errorf("failed to create table directory: %w", err) + }) + if createErr != nil { + return fmt.Errorf("failed to create %s: %w", errorContext, createErr) + } + if resp.Error != "" && !strings.Contains(resp.Error, "exist") { + return fmt.Errorf("failed to create %s: %s", errorContext, resp.Error) + } + return nil } - if resp.Error != "" && !strings.Contains(resp.Error, "exist") { - return fmt.Errorf("failed to create table directory: %s", resp.Error) + + // 1. Ensure table directory exists: /table-buckets///
+ tableDir := fmt.Sprintf("/table-buckets/%s/%s/%s", bucketName, namespace, tableName) + if err := ensureDir(fmt.Sprintf("/table-buckets/%s/%s", bucketName, namespace), tableName, "table directory"); err != nil { + return err } // 2. Ensure metadata directory exists: /table-buckets///
/metadata metadataDir := fmt.Sprintf("%s/metadata", tableDir) - resp, err = client.CreateEntry(opCtx, &filer_pb.CreateEntryRequest{ - Directory: tableDir, - Entry: &filer_pb.Entry{ - Name: "metadata", - IsDirectory: true, - Attributes: &filer_pb.FuseAttributes{ - Mtime: time.Now().Unix(), - Crtime: time.Now().Unix(), - FileMode: uint32(0755 | os.ModeDir), - }, - }, - }) - if err != nil { - return fmt.Errorf("failed to create metadata directory: %w", err) - } - if resp.Error != "" && !strings.Contains(resp.Error, "exist") { - return fmt.Errorf("failed to create metadata directory: %s", resp.Error) + if err := ensureDir(tableDir, "metadata", "metadata directory"); err != nil { + return err } // 3. Write the file - resp, err = client.CreateEntry(opCtx, &filer_pb.CreateEntryRequest{ + resp, err := client.CreateEntry(opCtx, &filer_pb.CreateEntryRequest{ Directory: metadataDir, Entry: &filer_pb.Entry{ Name: metadataFileName, @@ -224,7 +228,7 @@ func (s *Server) saveMetadataFile(ctx context.Context, bucketName, namespace, ta }, }) if err != nil { - return fmt.Errorf("failed to write metadata file context: %w", err) + return fmt.Errorf("failed to write metadata file: %w", err) } if resp.Error != "" { return fmt.Errorf("failed to write metadata file: %s", resp.Error) diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 3ec247ffd..757374ea1 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -257,6 +257,9 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) fs.filer.Dlm.LockRing.SetTakeSnapshotCallback(fs.OnDlmChangeSnapshot) if fs.CredentialManager != nil { + fs.CredentialManager.SetFilerAddressFunc(func() pb.ServerAddress { + return fs.option.Host + }, fs.grpcDialOption) fs.CredentialManager.SetMasterClient(fs.filer.MasterClient, fs.grpcDialOption) }