Browse Source

Fix: filer not yet available in s3.configure (#8198)

* Fix: Initialize filer CredentialManager with filer address

* The fix involves checking for directory existence before creation.

* adjust error message

* Fix: Implement FilerAddressSetter in PropagatingCredentialStore

* Refactor: Reorder credential manager initialization in filer server

* refactor
master
Chris Lu 17 hours ago
committed by GitHub
parent
commit
f66a23b472
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 14
      weed/credential/credential_manager.go
  2. 6
      weed/credential/propagating_store.go
  3. 76
      weed/s3api/iceberg/iceberg.go
  4. 3
      weed/server/filer_server.go

14
weed/credential/credential_manager.go

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/policy_engine" "github.com/seaweedfs/seaweedfs/weed/s3api/policy_engine"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
@ -12,6 +13,11 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
// FilerAddressSetter is an interface for credential stores that need a dynamic filer address
type FilerAddressSetter interface {
SetFilerAddressFunc(getFiler func() pb.ServerAddress, grpcDialOption grpc.DialOption)
}
// CredentialManager manages user credentials using a configurable store // CredentialManager manages user credentials using a configurable store
type CredentialManager struct { type CredentialManager struct {
store CredentialStore store CredentialStore
@ -44,11 +50,17 @@ func NewCredentialManager(storeName CredentialStoreTypeName, configuration util.
}, nil }, nil
} }
// SetMasterClient sets the master client to enable propagation of changes to S3 servers
func (cm *CredentialManager) SetMasterClient(masterClient *wdclient.MasterClient, grpcDialOption grpc.DialOption) { func (cm *CredentialManager) SetMasterClient(masterClient *wdclient.MasterClient, grpcDialOption grpc.DialOption) {
cm.store = NewPropagatingCredentialStore(cm.store, masterClient, grpcDialOption) cm.store = NewPropagatingCredentialStore(cm.store, masterClient, grpcDialOption)
} }
// SetFilerAddressFunc sets the function to get the current filer address
func (cm *CredentialManager) SetFilerAddressFunc(getFiler func() pb.ServerAddress, grpcDialOption grpc.DialOption) {
if s, ok := cm.store.(FilerAddressSetter); ok {
s.SetFilerAddressFunc(getFiler, grpcDialOption)
}
}
// GetStore returns the underlying credential store // GetStore returns the underlying credential store
func (cm *CredentialManager) GetStore() CredentialStore { func (cm *CredentialManager) GetStore() CredentialStore {
return cm.store return cm.store

6
weed/credential/propagating_store.go

@ -34,6 +34,12 @@ func NewPropagatingCredentialStore(upstream CredentialStore, masterClient *wdcli
} }
} }
func (s *PropagatingCredentialStore) SetFilerAddressFunc(getFiler func() pb.ServerAddress, grpcDialOption grpc.DialOption) {
if setter, ok := s.CredentialStore.(FilerAddressSetter); ok {
setter.SetFilerAddressFunc(getFiler, grpcDialOption)
}
}
func (s *PropagatingCredentialStore) propagateChange(ctx context.Context, fn func(context.Context, s3_pb.SeaweedS3IamCacheClient) error) { func (s *PropagatingCredentialStore) propagateChange(ctx context.Context, fn func(context.Context, s3_pb.SeaweedS3IamCacheClient) error) {
if s.masterClient == nil { if s.masterClient == nil {
return return

76
weed/s3api/iceberg/iceberg.go

@ -164,50 +164,54 @@ func (s *Server) saveMetadataFile(ctx context.Context, bucketName, namespace, ta
defer cancel() defer cancel()
return s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// 1. Ensure table directory exists: /table-buckets/<bucket>/<namespace>/<table>
tableDir := fmt.Sprintf("/table-buckets/%s/%s/%s", bucketName, namespace, tableName)
resp, err := client.CreateEntry(opCtx, &filer_pb.CreateEntryRequest{
Directory: fmt.Sprintf("/table-buckets/%s/%s", bucketName, namespace),
Entry: &filer_pb.Entry{
Name: tableName,
IsDirectory: true,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(0755 | os.ModeDir),
ensureDir := func(parent, name, errorContext string) error {
_, err := filer_pb.LookupEntry(opCtx, client, &filer_pb.LookupDirectoryEntryRequest{
Directory: parent,
Name: name,
})
if err == nil {
return nil
}
if err != filer_pb.ErrNotFound {
return fmt.Errorf("lookup %s failed: %w", errorContext, err)
}
// If lookup fails with ErrNotFound, try to create the directory.
resp, createErr := client.CreateEntry(opCtx, &filer_pb.CreateEntryRequest{
Directory: parent,
Entry: &filer_pb.Entry{
Name: name,
IsDirectory: true,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(0755 | os.ModeDir),
},
}, },
},
})
if err != nil {
return fmt.Errorf("failed to create table directory: %w", err)
})
if createErr != nil {
return fmt.Errorf("failed to create %s: %w", errorContext, createErr)
}
if resp.Error != "" && !strings.Contains(resp.Error, "exist") {
return fmt.Errorf("failed to create %s: %s", errorContext, resp.Error)
}
return nil
} }
if resp.Error != "" && !strings.Contains(resp.Error, "exist") {
return fmt.Errorf("failed to create table directory: %s", resp.Error)
// 1. Ensure table directory exists: /table-buckets/<bucket>/<namespace>/<table>
tableDir := fmt.Sprintf("/table-buckets/%s/%s/%s", bucketName, namespace, tableName)
if err := ensureDir(fmt.Sprintf("/table-buckets/%s/%s", bucketName, namespace), tableName, "table directory"); err != nil {
return err
} }
// 2. Ensure metadata directory exists: /table-buckets/<bucket>/<namespace>/<table>/metadata // 2. Ensure metadata directory exists: /table-buckets/<bucket>/<namespace>/<table>/metadata
metadataDir := fmt.Sprintf("%s/metadata", tableDir) metadataDir := fmt.Sprintf("%s/metadata", tableDir)
resp, err = client.CreateEntry(opCtx, &filer_pb.CreateEntryRequest{
Directory: tableDir,
Entry: &filer_pb.Entry{
Name: "metadata",
IsDirectory: true,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(0755 | os.ModeDir),
},
},
})
if err != nil {
return fmt.Errorf("failed to create metadata directory: %w", err)
}
if resp.Error != "" && !strings.Contains(resp.Error, "exist") {
return fmt.Errorf("failed to create metadata directory: %s", resp.Error)
if err := ensureDir(tableDir, "metadata", "metadata directory"); err != nil {
return err
} }
// 3. Write the file // 3. Write the file
resp, err = client.CreateEntry(opCtx, &filer_pb.CreateEntryRequest{
resp, err := client.CreateEntry(opCtx, &filer_pb.CreateEntryRequest{
Directory: metadataDir, Directory: metadataDir,
Entry: &filer_pb.Entry{ Entry: &filer_pb.Entry{
Name: metadataFileName, Name: metadataFileName,
@ -224,7 +228,7 @@ func (s *Server) saveMetadataFile(ctx context.Context, bucketName, namespace, ta
}, },
}) })
if err != nil { if err != nil {
return fmt.Errorf("failed to write metadata file context: %w", err)
return fmt.Errorf("failed to write metadata file: %w", err)
} }
if resp.Error != "" { if resp.Error != "" {
return fmt.Errorf("failed to write metadata file: %s", resp.Error) return fmt.Errorf("failed to write metadata file: %s", resp.Error)

3
weed/server/filer_server.go

@ -257,6 +257,9 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
fs.filer.Dlm.LockRing.SetTakeSnapshotCallback(fs.OnDlmChangeSnapshot) fs.filer.Dlm.LockRing.SetTakeSnapshotCallback(fs.OnDlmChangeSnapshot)
if fs.CredentialManager != nil { if fs.CredentialManager != nil {
fs.CredentialManager.SetFilerAddressFunc(func() pb.ServerAddress {
return fs.option.Host
}, fs.grpcDialOption)
fs.CredentialManager.SetMasterClient(fs.filer.MasterClient, fs.grpcDialOption) fs.CredentialManager.SetMasterClient(fs.filer.MasterClient, fs.grpcDialOption)
} }

Loading…
Cancel
Save