Browse Source

Wire IAM gRPC service to filer server

- Add CredentialManager field to FilerOption and FilerServer
- Import credential store implementations in filer command
- Initialize CredentialManager from credential.toml if available
- Register IAM gRPC service on filer gRPC server
- Enable credential management via gRPC alongside existing filer services
add-iam-grpc-management
Chris Lu 3 weeks ago
parent
commit
5798b6bc3a
  1. 32
      weed/command/filer.go
  2. 6
      weed/server/filer_server.go

32
weed/command/filer.go

@ -18,10 +18,15 @@ import (
"google.golang.org/grpc/credentials/tls/certprovider/pemfile" "google.golang.org/grpc/credentials/tls/certprovider/pemfile"
"google.golang.org/grpc/reflection" "google.golang.org/grpc/reflection"
"github.com/seaweedfs/seaweedfs/weed/credential"
_ "github.com/seaweedfs/seaweedfs/weed/credential/filer_etc"
_ "github.com/seaweedfs/seaweedfs/weed/credential/memory"
_ "github.com/seaweedfs/seaweedfs/weed/credential/postgres"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
"github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/security"
weed_server "github.com/seaweedfs/seaweedfs/weed/server" weed_server "github.com/seaweedfs/seaweedfs/weed/server"
stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
@ -324,6 +329,24 @@ func (fo *FilerOptions) startFiler() {
filerAddress := pb.NewServerAddress(*fo.ip, *fo.port, *fo.portGrpc) filerAddress := pb.NewServerAddress(*fo.ip, *fo.port, *fo.portGrpc)
// Initialize credential manager for IAM gRPC service
var credentialManager *credential.CredentialManager
credConfig, err := credential.LoadCredentialConfiguration()
if err == nil && credConfig != nil {
credentialManager, err = credential.NewCredentialManager(
credential.CredentialStoreTypeName(credConfig.Store),
credConfig.Config,
credConfig.Prefix,
)
if err != nil {
glog.Warningf("Failed to initialize credential manager: %v", err)
} else {
glog.V(0).Infof("Initialized credential manager with store: %s", credConfig.Store)
}
} else {
glog.V(1).Info("No credential store configured for filer")
}
fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{ fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{
Masters: fo.masters, Masters: fo.masters,
FilerGroup: *fo.filerGroup, FilerGroup: *fo.filerGroup,
@ -346,6 +369,7 @@ func (fo *FilerOptions) startFiler() {
DiskType: *fo.diskType, DiskType: *fo.diskType,
AllowedOrigins: strings.Split(*fo.allowedOrigins, ","), AllowedOrigins: strings.Split(*fo.allowedOrigins, ","),
TusBasePath: *fo.tusBasePath, TusBasePath: *fo.tusBasePath,
CredentialManager: credentialManager,
}) })
if nfs_err != nil { if nfs_err != nil {
glog.Fatalf("Filer startup error: %v", nfs_err) glog.Fatalf("Filer startup error: %v", nfs_err)
@ -389,6 +413,14 @@ func (fo *FilerOptions) startFiler() {
} }
grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer")) grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer"))
filer_pb.RegisterSeaweedFilerServer(grpcS, fs) filer_pb.RegisterSeaweedFilerServer(grpcS, fs)
// Register IAM gRPC service if credential manager is available
if credentialManager != nil {
iamGrpcServer := weed_server.NewIamGrpcServer(credentialManager)
iam_pb.RegisterSeaweedIdentityAccessManagementServer(grpcS, iamGrpcServer)
glog.V(0).Info("Registered IAM gRPC service on filer")
}
reflection.Register(grpcS) reflection.Register(grpcS)
if grpcLocalL != nil { if grpcLocalL != nil {
go grpcS.Serve(grpcLocalL) go grpcS.Serve(grpcLocalL)

6
weed/server/filer_server.go

@ -10,6 +10,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/stats"
"golang.org/x/sync/singleflight" "golang.org/x/sync/singleflight"
@ -81,6 +82,7 @@ type FilerOption struct {
AllowedOrigins []string AllowedOrigins []string
ExposeDirectoryData bool ExposeDirectoryData bool
TusBasePath string TusBasePath string
CredentialManager *credential.CredentialManager
} }
type FilerServer struct { type FilerServer struct {
@ -112,6 +114,9 @@ type FilerServer struct {
// deduplicates concurrent remote object caching operations // deduplicates concurrent remote object caching operations
remoteCacheGroup singleflight.Group remoteCacheGroup singleflight.Group
// credential manager for IAM operations
credentialManager *credential.CredentialManager
} }
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
@ -148,6 +153,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"), grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
knownListeners: make(map[int32]int32), knownListeners: make(map[int32]int32),
inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)), inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
credentialManager: option.CredentialManager,
} }
fs.listenersCond = sync.NewCond(&fs.listenersLock) fs.listenersCond = sync.NewCond(&fs.listenersLock)

Loading…
Cancel
Save